ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.10
Committed: Fri Jul 3 02:55:10 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.9: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
8 root 1.1
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19 root 1.8 #ifndef SvREFCNT_dec_NN
20     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21     #endif
22    
23     #ifndef SvREFCNT_inc_NN
24     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25     #endif
26    
27 root 1.1 static pthread_key_t current_key;
28    
29     static s_epipe ep;
30     static void *perl_thx;
31     static sigset_t cursigset, fullsigset;
32    
33 root 1.6 static int global_enable = 0;
34 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35    
36 root 1.6 /* assigned to a thread for each release/acquire */
37 root 1.1 struct tctx
38     {
39     void *coro;
40 root 1.6 int wait_f;
41 root 1.9 xcond_t acquire_c;
42 root 1.1 };
43    
44     static struct tctx *tctx_free;
45    
46     static struct tctx *
47     tctx_get (void)
48     {
49     struct tctx *ctx;
50    
51     if (!tctx_free)
52 root 1.3 {
53     ctx = malloc (sizeof (*tctx_free));
54 root 1.9 X_COND_CREATE (ctx->acquire_c);
55 root 1.3 }
56 root 1.1 else
57     {
58     ctx = tctx_free;
59     tctx_free = tctx_free->coro;
60     }
61    
62     return ctx;
63     }
64    
65     static void
66     tctx_put (struct tctx *ctx)
67     {
68     ctx->coro = tctx_free;
69     tctx_free = ctx;
70     }
71    
72 root 1.9 /* a stack of tctxs */
73     struct tctxs
74     {
75     struct tctx **ctxs;
76     int cur, max;
77     };
78    
79     static struct tctx *
80     tctxs_get (struct tctxs *ctxs)
81     {
82     return ctxs->ctxs[--ctxs->cur];
83     }
84    
85     static void
86     tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
87     {
88     if (ctxs->cur >= ctxs->max)
89     {
90     ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
91     ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
92     }
93    
94     ctxs->ctxs[ctxs->cur++] = ctx;
95     }
96    
97     static xmutex_t release_m = X_MUTEX_INIT;
98     static xcond_t release_c = X_COND_INIT;
99     static struct tctxs releasers;
100     static int idle;
101     static int min_idle = 1;
102     static int curthreads, max_threads = 1; /* protected by release_m */
103    
104     static xmutex_t acquire_m = X_MUTEX_INIT;
105     static struct tctxs acquirers;
106    
107 root 1.1 X_THREAD_PROC(thread_proc)
108     {
109     PERL_SET_CONTEXT (perl_thx);
110    
111     {
112     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
113     struct tctx *ctx;
114    
115 root 1.9 X_LOCK (release_m);
116 root 1.6
117 root 1.1 for (;;)
118     {
119 root 1.9 while (!releasers.cur)
120 root 1.6 if (idle <= min_idle || 1)
121 root 1.9 X_COND_WAIT (release_c, release_m);
122 root 1.6 else
123     {
124     struct timespec ts = { time (0) + idle - min_idle, 0 };
125    
126 root 1.9 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
127     if (idle > min_idle && !releasers.cur)
128 root 1.6 break;
129     }
130    
131 root 1.9 ctx = tctxs_get (&releasers);
132 root 1.6 --idle;
133 root 1.9 X_UNLOCK (release_m);
134 root 1.3
135 root 1.6 if (!ctx) /* timed out? */
136     break;
137    
138 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
139    
140     while (ctx->coro)
141     CORO_SCHEDULE;
142    
143 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
144    
145 root 1.9 X_LOCK (acquire_m);
146 root 1.3 ctx->wait_f = 1;
147 root 1.9 X_COND_SIGNAL (ctx->acquire_c);
148     X_UNLOCK (acquire_m);
149 root 1.1
150 root 1.9 X_LOCK (release_m);
151 root 1.6 ++idle;
152 root 1.1 }
153     }
154     }
155    
156     static void
157     start_thread (void)
158     {
159     xthread_t tid;
160    
161 root 1.9 if (curthreads >= max_threads && 0)
162     return;
163    
164     ++curthreads;
165 root 1.6 ++idle;
166 root 1.1 xthread_create (&tid, thread_proc, 0);
167     }
168    
169     static void
170     pmapi_release (void)
171     {
172 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
173     {
174     pthread_setspecific (current_key, 0);
175     return;
176     }
177    
178 root 1.1 struct tctx *ctx = tctx_get ();
179     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
180 root 1.3 ctx->wait_f = 0;
181    
182 root 1.1 pthread_setspecific (current_key, ctx);
183     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
184    
185 root 1.9 X_LOCK (release_m);
186 root 1.6
187     if (idle <= min_idle)
188 root 1.1 start_thread ();
189    
190 root 1.9 tctxs_put (&releasers, ctx);
191     X_COND_SIGNAL (release_c);
192    
193     while (!idle && releasers.cur)
194     {
195     X_UNLOCK (release_m);
196     X_LOCK (release_m);
197     }
198 root 1.6
199 root 1.9 X_UNLOCK (release_m);
200 root 1.1 }
201    
202     static void
203     pmapi_acquire (void)
204     {
205     struct tctx *ctx = pthread_getspecific (current_key);
206    
207 root 1.4 if (!ctx)
208     return;
209    
210 root 1.9 X_LOCK (acquire_m);
211 root 1.1
212 root 1.9 tctxs_put (&acquirers, ctx);
213 root 1.1
214     s_epipe_signal (&ep);
215 root 1.3 while (!ctx->wait_f)
216 root 1.9 X_COND_WAIT (ctx->acquire_c, acquire_m);
217     X_UNLOCK (acquire_m);
218 root 1.1
219     tctx_put (ctx);
220     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
221     }
222    
223 root 1.4 static void
224 root 1.6 set_thread_enable (pTHX_ void *arg)
225 root 1.4 {
226 root 1.6 thread_enable = PTR2IV (arg);
227 root 1.4 }
228    
229 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
230    
231     PROTOTYPES: DISABLE
232    
233     BOOT:
234     {
235     #ifndef _WIN32
236     sigfillset (&fullsigset);
237     #endif
238    
239     pthread_key_create (&current_key, 0);
240    
241     if (s_epipe_new (&ep))
242     croak ("Coro::Multicore: unable to initialise event pipe.\n");
243    
244     perl_thx = PERL_GET_CONTEXT;
245    
246     I_CORO_API ("Coro::Multicore");
247    
248 root 1.9 X_LOCK (release_m);
249 root 1.6 while (idle < min_idle)
250     start_thread ();
251 root 1.9 X_UNLOCK (release_m);
252 root 1.6
253 root 1.1 /* not perfectly efficient to do it this way, but it's simple */
254     perl_multicore_init ();
255     perl_multicore_api->pmapi_release = pmapi_release;
256     perl_multicore_api->pmapi_acquire = pmapi_acquire;
257     }
258    
259 root 1.4 bool
260     enable (bool enable = NO_INIT)
261     CODE:
262     RETVAL = global_enable;
263     if (items)
264     global_enable = enable;
265     OUTPUT:
266     RETVAL
267    
268     void
269     scoped_enable ()
270     CODE:
271     LEAVE; /* see Guard.xs */
272 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
273 root 1.4 ENTER; /* see Guard.xs */
274    
275     void
276     scoped_disable ()
277     CODE:
278     LEAVE; /* see Guard.xs */
279 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
280 root 1.4 ENTER; /* see Guard.xs */
281    
282 root 1.1 U32
283 root 1.6 min_idle_threads (U32 min = NO_INIT)
284 root 1.1 CODE:
285 root 1.9 X_LOCK (acquire_m);
286 root 1.6 RETVAL = min_idle;
287 root 1.1 if (items)
288 root 1.6 min_idle = min;
289 root 1.9 X_UNLOCK (acquire_m);
290 root 1.1 OUTPUT:
291     RETVAL
292    
293    
294     int
295     fd ()
296     CODE:
297     RETVAL = s_epipe_fd (&ep);
298     OUTPUT:
299     RETVAL
300    
301     void
302     poll (...)
303     CODE:
304     s_epipe_drain (&ep);
305 root 1.9 X_LOCK (acquire_m);
306     while (acquirers.cur)
307 root 1.1 {
308 root 1.9 struct tctx *ctx = tctxs_get (&acquirers);
309 root 1.1 CORO_READY ((SV *)ctx->coro);
310     SvREFCNT_dec_NN ((SV *)ctx->coro);
311     ctx->coro = 0;
312     }
313 root 1.9 X_UNLOCK (acquire_m);
314 root 1.1
315     void
316 root 1.2 sleep (NV seconds)
317 root 1.1 CODE:
318     perlinterp_release ();
319 root 1.2 usleep (seconds * 1e6);
320 root 1.1 perlinterp_acquire ();
321 root 1.2