ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.8
Committed: Wed Jul 1 22:39:07 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.7: +8 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7 root 1.7 #define X_STACKSIZE 1024 * sizeof (void *)
8 root 1.1
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19 root 1.8 #ifndef SvREFCNT_dec_NN
20     #define SvREFCNT_dec_NN(sv) SvREFCNT_dec (sv)
21     #endif
22    
23     #ifndef SvREFCNT_inc_NN
24     #define SvREFCNT_inc_NN(sv) SvREFCNT_inc (sv)
25     #endif
26    
27 root 1.1 static pthread_key_t current_key;
28    
29     static s_epipe ep;
30     static void *perl_thx;
31     static sigset_t cursigset, fullsigset;
32    
33 root 1.6 static int global_enable = 0;
34 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
35    
36 root 1.6 /* assigned to a thread for each release/acquire */
37 root 1.1 struct tctx
38     {
39     void *coro;
40 root 1.6 int wait_f;
41 root 1.3 xcond_t wait_c;
42 root 1.1 };
43    
44     static struct tctx *tctx_free;
45    
46 root 1.5 static int idle;
47 root 1.6 static int min_idle = 1;
48 root 1.1
49 root 1.3 static xmutex_t perl_m = X_MUTEX_INIT;
50     static xcond_t perl_c = X_COND_INIT;
51     static struct tctx *perl_f;
52    
53     static xmutex_t wait_m = X_MUTEX_INIT;
54 root 1.1
55     static int wakeup_f;
56     static struct tctx **waiters;
57     static int waiters_count, waiters_max;
58    
59     static struct tctx *
60     tctx_get (void)
61     {
62     struct tctx *ctx;
63    
64     if (!tctx_free)
65 root 1.3 {
66     ctx = malloc (sizeof (*tctx_free));
67     X_COND_CREATE (ctx->wait_c);
68     }
69 root 1.1 else
70     {
71     ctx = tctx_free;
72     tctx_free = tctx_free->coro;
73     }
74    
75     return ctx;
76     }
77    
78     static void
79     tctx_put (struct tctx *ctx)
80     {
81     ctx->coro = tctx_free;
82     tctx_free = ctx;
83     }
84    
85     X_THREAD_PROC(thread_proc)
86     {
87     PERL_SET_CONTEXT (perl_thx);
88    
89     {
90     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
91     struct tctx *ctx;
92    
93 root 1.6 X_LOCK (perl_m);
94    
95 root 1.1 for (;;)
96     {
97     while (!perl_f)
98 root 1.6 if (idle <= min_idle || 1)
99     X_COND_WAIT (perl_c, perl_m);
100     else
101     {
102     struct timespec ts = { time (0) + idle - min_idle, 0 };
103    
104     if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT)
105     if (idle > min_idle && !perl_f)
106     break;
107     }
108    
109 root 1.1 ctx = perl_f;
110     perl_f = 0;
111 root 1.6 --idle;
112 root 1.3 X_UNLOCK (perl_m);
113    
114 root 1.6 if (!ctx) /* timed out? */
115     break;
116    
117 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
118    
119     while (ctx->coro)
120     CORO_SCHEDULE;
121    
122 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
123    
124 root 1.1 X_LOCK (wait_m);
125 root 1.3 ctx->wait_f = 1;
126     X_COND_SIGNAL (ctx->wait_c);
127 root 1.6 X_UNLOCK (wait_m);
128 root 1.1
129 root 1.6 X_LOCK (perl_m);
130     ++idle;
131 root 1.1 }
132     }
133     }
134    
135     static void
136     start_thread (void)
137     {
138     xthread_t tid;
139    
140 root 1.6 ++idle;
141 root 1.1 xthread_create (&tid, thread_proc, 0);
142     }
143    
144     static void
145     pmapi_release (void)
146     {
147 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
148     {
149     pthread_setspecific (current_key, 0);
150     return;
151     }
152    
153 root 1.1 struct tctx *ctx = tctx_get ();
154     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
155 root 1.3 ctx->wait_f = 0;
156    
157 root 1.1 pthread_setspecific (current_key, ctx);
158     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
159    
160 root 1.6 X_LOCK (perl_m);
161    
162     if (idle <= min_idle)
163 root 1.1 start_thread ();
164    
165     perl_f = ctx;
166     X_COND_SIGNAL (perl_c);
167 root 1.6
168 root 1.1 X_UNLOCK (perl_m);
169     }
170    
171     static void
172     pmapi_acquire (void)
173     {
174     struct tctx *ctx = pthread_getspecific (current_key);
175    
176 root 1.4 if (!ctx)
177     return;
178    
179 root 1.1 X_LOCK (wait_m);
180    
181     if (waiters_count >= waiters_max)
182     {
183     waiters_max = waiters_max ? waiters_max * 2 : 16;
184     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
185     }
186    
187     waiters [waiters_count++] = ctx;
188    
189     s_epipe_signal (&ep);
190 root 1.3 while (!ctx->wait_f)
191     X_COND_WAIT (ctx->wait_c, wait_m);
192 root 1.1 X_UNLOCK (wait_m);
193    
194     tctx_put (ctx);
195     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
196     }
197    
198 root 1.4 static void
199 root 1.6 set_thread_enable (pTHX_ void *arg)
200 root 1.4 {
201 root 1.6 thread_enable = PTR2IV (arg);
202 root 1.4 }
203    
204 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
205    
206     PROTOTYPES: DISABLE
207    
208     BOOT:
209     {
210     #ifndef _WIN32
211     sigfillset (&fullsigset);
212     #endif
213    
214     pthread_key_create (&current_key, 0);
215    
216     if (s_epipe_new (&ep))
217     croak ("Coro::Multicore: unable to initialise event pipe.\n");
218    
219     perl_thx = PERL_GET_CONTEXT;
220    
221     I_CORO_API ("Coro::Multicore");
222    
223 root 1.6 X_LOCK (perl_m);
224     while (idle < min_idle)
225     start_thread ();
226     start_thread ();//D
227     X_UNLOCK (perl_m);
228    
229 root 1.1 /* not perfectly efficient to do it this way, but it's simple */
230     perl_multicore_init ();
231     perl_multicore_api->pmapi_release = pmapi_release;
232     perl_multicore_api->pmapi_acquire = pmapi_acquire;
233     }
234    
235 root 1.4 bool
236     enable (bool enable = NO_INIT)
237     CODE:
238     RETVAL = global_enable;
239     if (items)
240     global_enable = enable;
241     OUTPUT:
242     RETVAL
243    
244     void
245     scoped_enable ()
246     CODE:
247     LEAVE; /* see Guard.xs */
248 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
249 root 1.4 ENTER; /* see Guard.xs */
250    
251     void
252     scoped_disable ()
253     CODE:
254     LEAVE; /* see Guard.xs */
255 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
256 root 1.4 ENTER; /* see Guard.xs */
257    
258 root 1.1 U32
259 root 1.6 min_idle_threads (U32 min = NO_INIT)
260 root 1.1 CODE:
261     X_LOCK (wait_m);
262 root 1.6 RETVAL = min_idle;
263 root 1.1 if (items)
264 root 1.6 min_idle = min;
265 root 1.1 X_UNLOCK (wait_m);
266     OUTPUT:
267     RETVAL
268    
269    
270     int
271     fd ()
272     CODE:
273     RETVAL = s_epipe_fd (&ep);
274     OUTPUT:
275     RETVAL
276    
277     void
278     poll (...)
279     CODE:
280     s_epipe_drain (&ep);
281     X_LOCK (wait_m);
282     while (waiters_count)
283     {
284     struct tctx *ctx = waiters [--waiters_count];
285     CORO_READY ((SV *)ctx->coro);
286     SvREFCNT_dec_NN ((SV *)ctx->coro);
287     ctx->coro = 0;
288     }
289     X_UNLOCK (wait_m);
290    
291     void
292 root 1.2 sleep (NV seconds)
293 root 1.1 CODE:
294     perlinterp_release ();
295 root 1.2 usleep (seconds * 1e6);
296 root 1.1 perlinterp_acquire ();
297 root 1.2