ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.4
Committed: Sun Jun 28 17:39:42 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.3: +53 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 static int global_enable = 1;
26 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27
28 struct tctx
29 {
30 void *coro;
31 xcond_t wait_c;
32 int wait_f;
33 };
34
35 static struct tctx *tctx_free;
36
37 static int available;
38 static int max_idle = 8;
39
40 static xmutex_t perl_m = X_MUTEX_INIT;
41 static xcond_t perl_c = X_COND_INIT;
42 static struct tctx *perl_f;
43
44 static xmutex_t wait_m = X_MUTEX_INIT;
45
46 static int wakeup_f;
47 static struct tctx **waiters;
48 static int waiters_count, waiters_max;
49
50 static struct tctx *
51 tctx_get (void)
52 {
53 struct tctx *ctx;
54
55 if (!tctx_free)
56 {
57 ctx = malloc (sizeof (*tctx_free));
58 X_COND_CREATE (ctx->wait_c);
59 }
60 else
61 {
62 ctx = tctx_free;
63 tctx_free = tctx_free->coro;
64 }
65
66 return ctx;
67 }
68
69 static void
70 tctx_put (struct tctx *ctx)
71 {
72 ctx->coro = tctx_free;
73 tctx_free = ctx;
74 }
75
76 X_THREAD_PROC(thread_proc)
77 {
78 PERL_SET_CONTEXT (perl_thx);
79
80 {
81 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
82 struct tctx *ctx;
83
84 for (;;)
85 {
86 /* TODO: should really use some idle time and exit after that */
87 X_LOCK (perl_m);
88 while (!perl_f)
89 X_COND_WAIT (perl_c, perl_m);
90 ctx = perl_f;
91 perl_f = 0;
92 --available;
93 X_UNLOCK (perl_m);
94
95 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
96
97 while (ctx->coro)
98 CORO_SCHEDULE;
99
100 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
101
102 X_LOCK (wait_m);
103 ctx->wait_f = 1;
104 X_COND_SIGNAL (ctx->wait_c);
105
106 if (available >= max_idle)
107 {
108 X_UNLOCK (wait_m);
109 break;
110 }
111
112 ++available;
113 X_UNLOCK (wait_m);
114 }
115 }
116 }
117
118 static void
119 start_thread (void)
120 {
121 xthread_t tid;
122
123 ++available;
124 xthread_create (&tid, thread_proc, 0);
125 }
126
127 static void
128 pmapi_release (void)
129 {
130 if (!(thread_enable ? thread_enable & 1 : global_enable))
131 {
132 pthread_setspecific (current_key, 0);
133 return;
134 }
135
136 struct tctx *ctx = tctx_get ();
137 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
138 ctx->wait_f = 0;
139
140 pthread_setspecific (current_key, ctx);
141 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
142
143 if (!available)
144 start_thread ();
145
146 X_LOCK (perl_m);
147 perl_f = ctx;
148 X_COND_SIGNAL (perl_c);
149 X_UNLOCK (perl_m);
150 }
151
152 static void
153 pmapi_acquire (void)
154 {
155 struct tctx *ctx = pthread_getspecific (current_key);
156
157 if (!ctx)
158 return;
159
160 X_LOCK (wait_m);
161
162 if (waiters_count >= waiters_max)
163 {
164 waiters_max = waiters_max ? waiters_max * 2 : 16;
165 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
166 }
167
168 waiters [waiters_count++] = ctx;
169
170 s_epipe_signal (&ep);
171 while (!ctx->wait_f)
172 X_COND_WAIT (ctx->wait_c, wait_m);
173 X_UNLOCK (wait_m);
174
175 tctx_put (ctx);
176 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
177 }
178
179 static void
180 set_enable_0 (pTHX)
181 {
182 thread_enable = 0;
183 }
184
185 static void
186 set_enable_1 (pTHX)
187 {
188 thread_enable = 1;
189 }
190
191 static void
192 set_enable_2 (pTHX)
193 {
194 thread_enable = 2;
195 }
196
197 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
198
199 PROTOTYPES: DISABLE
200
201 BOOT:
202 {
203 #ifndef _WIN32
204 sigfillset (&fullsigset);
205 #endif
206
207 pthread_key_create (&current_key, 0);
208
209 if (s_epipe_new (&ep))
210 croak ("Coro::Multicore: unable to initialise event pipe.\n");
211
212 perl_thx = PERL_GET_CONTEXT;
213
214 I_CORO_API ("Coro::Multicore");
215
216 /* not perfectly efficient to do it this way, but it's simple */
217 perl_multicore_init ();
218 perl_multicore_api->pmapi_release = pmapi_release;
219 perl_multicore_api->pmapi_acquire = pmapi_acquire;
220 }
221
222 bool
223 enable (bool enable = NO_INIT)
224 CODE:
225 RETVAL = global_enable;
226 if (items)
227 global_enable = enable;
228 OUTPUT:
229 RETVAL
230
231 void
232 scoped_enable ()
233 CODE:
234 LEAVE; /* see Guard.xs */
235 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0);
236 ENTER; /* see Guard.xs */
237
238 void
239 scoped_disable ()
240 CODE:
241 LEAVE; /* see Guard.xs */
242 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0);
243 ENTER; /* see Guard.xs */
244
245 U32
246 max_idle_threads (U32 max = NO_INIT)
247 CODE:
248 X_LOCK (wait_m);
249 RETVAL = max_idle;
250 if (items)
251 max_idle = max;
252 X_UNLOCK (wait_m);
253 OUTPUT:
254 RETVAL
255
256
257 int
258 fd ()
259 CODE:
260 RETVAL = s_epipe_fd (&ep);
261 OUTPUT:
262 RETVAL
263
264 void
265 poll (...)
266 CODE:
267 s_epipe_drain (&ep);
268 X_LOCK (wait_m);
269 while (waiters_count)
270 {
271 struct tctx *ctx = waiters [--waiters_count];
272 CORO_READY ((SV *)ctx->coro);
273 SvREFCNT_dec_NN ((SV *)ctx->coro);
274 ctx->coro = 0;
275 }
276 X_UNLOCK (wait_m);
277
278 void
279 sleep (NV seconds)
280 CODE:
281 perlinterp_release ();
282 usleep (seconds * 1e6);
283 perlinterp_acquire ();
284