ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.5
Committed: Sun Jun 28 18:43:17 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.4: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 static int global_enable = 1;
26 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27
28 struct tctx
29 {
30 void *coro;
31 xcond_t wait_c;
32 int wait_f;
33 };
34
35 static struct tctx *tctx_free;
36
37 static int idle_timeout;
38 static int idle;
39 static int max_idle = 8;
40
41 static xmutex_t perl_m = X_MUTEX_INIT;
42 static xcond_t perl_c = X_COND_INIT;
43 static struct tctx *perl_f;
44
45 static xmutex_t wait_m = X_MUTEX_INIT;
46
47 static int wakeup_f;
48 static struct tctx **waiters;
49 static int waiters_count, waiters_max;
50
51 static struct tctx *
52 tctx_get (void)
53 {
54 struct tctx *ctx;
55
56 if (!tctx_free)
57 {
58 ctx = malloc (sizeof (*tctx_free));
59 X_COND_CREATE (ctx->wait_c);
60 }
61 else
62 {
63 ctx = tctx_free;
64 tctx_free = tctx_free->coro;
65 }
66
67 return ctx;
68 }
69
70 static void
71 tctx_put (struct tctx *ctx)
72 {
73 ctx->coro = tctx_free;
74 tctx_free = ctx;
75 }
76
77 X_THREAD_PROC(thread_proc)
78 {
79 PERL_SET_CONTEXT (perl_thx);
80
81 {
82 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
83 struct tctx *ctx;
84
85 for (;;)
86 {
87 /* TODO: should really use some idle time and exit after that */
88 X_LOCK (perl_m);
89 while (!perl_f)
90 X_COND_WAIT (perl_c, perl_m);
91 ctx = perl_f;
92 perl_f = 0;
93 --available;
94 X_UNLOCK (perl_m);
95
96 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
97
98 while (ctx->coro)
99 CORO_SCHEDULE;
100
101 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
102
103 X_LOCK (wait_m);
104 ctx->wait_f = 1;
105 X_COND_SIGNAL (ctx->wait_c);
106
107 if (available >= max_idle)
108 {
109 X_UNLOCK (wait_m);
110 break;
111 }
112
113 ++available;
114 X_UNLOCK (wait_m);
115 }
116 }
117 }
118
119 static void
120 start_thread (void)
121 {
122 xthread_t tid;
123
124 ++available;
125 xthread_create (&tid, thread_proc, 0);
126 }
127
128 static void
129 pmapi_release (void)
130 {
131 if (!(thread_enable ? thread_enable & 1 : global_enable))
132 {
133 pthread_setspecific (current_key, 0);
134 return;
135 }
136
137 struct tctx *ctx = tctx_get ();
138 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
139 ctx->wait_f = 0;
140
141 pthread_setspecific (current_key, ctx);
142 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
143
144 if (!available)
145 start_thread ();
146
147 X_LOCK (perl_m);
148 perl_f = ctx;
149 X_COND_SIGNAL (perl_c);
150 X_UNLOCK (perl_m);
151 }
152
153 static void
154 pmapi_acquire (void)
155 {
156 struct tctx *ctx = pthread_getspecific (current_key);
157
158 if (!ctx)
159 return;
160
161 X_LOCK (wait_m);
162
163 if (waiters_count >= waiters_max)
164 {
165 waiters_max = waiters_max ? waiters_max * 2 : 16;
166 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
167 }
168
169 waiters [waiters_count++] = ctx;
170
171 s_epipe_signal (&ep);
172 while (!ctx->wait_f)
173 X_COND_WAIT (ctx->wait_c, wait_m);
174 X_UNLOCK (wait_m);
175
176 tctx_put (ctx);
177 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
178 }
179
180 static void
181 set_enable_0 (pTHX)
182 {
183 thread_enable = 0;
184 }
185
186 static void
187 set_enable_1 (pTHX)
188 {
189 thread_enable = 1;
190 }
191
192 static void
193 set_enable_2 (pTHX)
194 {
195 thread_enable = 2;
196 }
197
198 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
199
200 PROTOTYPES: DISABLE
201
202 BOOT:
203 {
204 #ifndef _WIN32
205 sigfillset (&fullsigset);
206 #endif
207
208 pthread_key_create (&current_key, 0);
209
210 if (s_epipe_new (&ep))
211 croak ("Coro::Multicore: unable to initialise event pipe.\n");
212
213 perl_thx = PERL_GET_CONTEXT;
214
215 I_CORO_API ("Coro::Multicore");
216
217 /* not perfectly efficient to do it this way, but it's simple */
218 perl_multicore_init ();
219 perl_multicore_api->pmapi_release = pmapi_release;
220 perl_multicore_api->pmapi_acquire = pmapi_acquire;
221 }
222
223 bool
224 enable (bool enable = NO_INIT)
225 CODE:
226 RETVAL = global_enable;
227 if (items)
228 global_enable = enable;
229 OUTPUT:
230 RETVAL
231
232 void
233 scoped_enable ()
234 CODE:
235 LEAVE; /* see Guard.xs */
236 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0);
237 ENTER; /* see Guard.xs */
238
239 void
240 scoped_disable ()
241 CODE:
242 LEAVE; /* see Guard.xs */
243 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0);
244 ENTER; /* see Guard.xs */
245
246 U32
247 max_idle_threads (U32 max = NO_INIT)
248 CODE:
249 X_LOCK (wait_m);
250 RETVAL = max_idle;
251 if (items)
252 max_idle = max;
253 X_UNLOCK (wait_m);
254 OUTPUT:
255 RETVAL
256
257
258 int
259 fd ()
260 CODE:
261 RETVAL = s_epipe_fd (&ep);
262 OUTPUT:
263 RETVAL
264
265 void
266 poll (...)
267 CODE:
268 s_epipe_drain (&ep);
269 X_LOCK (wait_m);
270 while (waiters_count)
271 {
272 struct tctx *ctx = waiters [--waiters_count];
273 CORO_READY ((SV *)ctx->coro);
274 SvREFCNT_dec_NN ((SV *)ctx->coro);
275 ctx->coro = 0;
276 }
277 X_UNLOCK (wait_m);
278
279 void
280 sleep (NV seconds)
281 CODE:
282 perlinterp_release ();
283 usleep (seconds * 1e6);
284 perlinterp_acquire ();
285