ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.6
Committed: Mon Jun 29 13:15:53 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.5: +42 -38 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25 root 1.6 static int global_enable = 0;
26 root 1.4 static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27    
28 root 1.6 /* assigned to a thread for each release/acquire */
29 root 1.1 struct tctx
30     {
31     void *coro;
32 root 1.6 int wait_f;
33 root 1.3 xcond_t wait_c;
34 root 1.1 };
35    
36     static struct tctx *tctx_free;
37    
38 root 1.5 static int idle;
39 root 1.6 static int min_idle = 1;
40 root 1.1
41 root 1.3 static xmutex_t perl_m = X_MUTEX_INIT;
42     static xcond_t perl_c = X_COND_INIT;
43     static struct tctx *perl_f;
44    
45     static xmutex_t wait_m = X_MUTEX_INIT;
46 root 1.1
47     static int wakeup_f;
48     static struct tctx **waiters;
49     static int waiters_count, waiters_max;
50    
51     static struct tctx *
52     tctx_get (void)
53     {
54     struct tctx *ctx;
55    
56     if (!tctx_free)
57 root 1.3 {
58     ctx = malloc (sizeof (*tctx_free));
59     X_COND_CREATE (ctx->wait_c);
60     }
61 root 1.1 else
62     {
63     ctx = tctx_free;
64     tctx_free = tctx_free->coro;
65     }
66    
67     return ctx;
68     }
69    
70     static void
71     tctx_put (struct tctx *ctx)
72     {
73     ctx->coro = tctx_free;
74     tctx_free = ctx;
75     }
76    
77     X_THREAD_PROC(thread_proc)
78     {
79     PERL_SET_CONTEXT (perl_thx);
80    
81     {
82     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
83     struct tctx *ctx;
84    
85 root 1.6 X_LOCK (perl_m);
86    
87 root 1.1 for (;;)
88     {
89     while (!perl_f)
90 root 1.6 if (idle <= min_idle || 1)
91     X_COND_WAIT (perl_c, perl_m);
92     else
93     {
94     struct timespec ts = { time (0) + idle - min_idle, 0 };
95    
96     if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT)
97     if (idle > min_idle && !perl_f)
98     break;
99     }
100    
101 root 1.1 ctx = perl_f;
102     perl_f = 0;
103 root 1.6 --idle;
104 root 1.3 X_UNLOCK (perl_m);
105    
106 root 1.6 if (!ctx) /* timed out? */
107     break;
108    
109 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
110    
111     while (ctx->coro)
112     CORO_SCHEDULE;
113    
114 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
115    
116 root 1.1 X_LOCK (wait_m);
117 root 1.3 ctx->wait_f = 1;
118     X_COND_SIGNAL (ctx->wait_c);
119 root 1.6 X_UNLOCK (wait_m);
120 root 1.1
121 root 1.6 X_LOCK (perl_m);
122     ++idle;
123 root 1.1 }
124     }
125     }
126    
127     static void
128     start_thread (void)
129     {
130     xthread_t tid;
131    
132 root 1.6 ++idle;
133 root 1.1 xthread_create (&tid, thread_proc, 0);
134     }
135    
136     static void
137     pmapi_release (void)
138     {
139 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
140     {
141     pthread_setspecific (current_key, 0);
142     return;
143     }
144    
145 root 1.1 struct tctx *ctx = tctx_get ();
146     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
147 root 1.3 ctx->wait_f = 0;
148    
149 root 1.1 pthread_setspecific (current_key, ctx);
150     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
151    
152 root 1.6 X_LOCK (perl_m);
153    
154     if (idle <= min_idle)
155 root 1.1 start_thread ();
156    
157     perl_f = ctx;
158     X_COND_SIGNAL (perl_c);
159 root 1.6
160 root 1.1 X_UNLOCK (perl_m);
161     }
162    
163     static void
164     pmapi_acquire (void)
165     {
166     struct tctx *ctx = pthread_getspecific (current_key);
167    
168 root 1.4 if (!ctx)
169     return;
170    
171 root 1.1 X_LOCK (wait_m);
172    
173     if (waiters_count >= waiters_max)
174     {
175     waiters_max = waiters_max ? waiters_max * 2 : 16;
176     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
177     }
178    
179     waiters [waiters_count++] = ctx;
180    
181     s_epipe_signal (&ep);
182 root 1.3 while (!ctx->wait_f)
183     X_COND_WAIT (ctx->wait_c, wait_m);
184 root 1.1 X_UNLOCK (wait_m);
185    
186     tctx_put (ctx);
187     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
188     }
189    
190 root 1.4 static void
191 root 1.6 set_thread_enable (pTHX_ void *arg)
192 root 1.4 {
193 root 1.6 thread_enable = PTR2IV (arg);
194 root 1.4 }
195    
196 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
197    
198     PROTOTYPES: DISABLE
199    
200     BOOT:
201     {
202     #ifndef _WIN32
203     sigfillset (&fullsigset);
204     #endif
205    
206     pthread_key_create (&current_key, 0);
207    
208     if (s_epipe_new (&ep))
209     croak ("Coro::Multicore: unable to initialise event pipe.\n");
210    
211     perl_thx = PERL_GET_CONTEXT;
212    
213     I_CORO_API ("Coro::Multicore");
214    
215 root 1.6 X_LOCK (perl_m);
216     while (idle < min_idle)
217     start_thread ();
218     start_thread ();//D
219     X_UNLOCK (perl_m);
220    
221 root 1.1 /* not perfectly efficient to do it this way, but it's simple */
222     perl_multicore_init ();
223     perl_multicore_api->pmapi_release = pmapi_release;
224     perl_multicore_api->pmapi_acquire = pmapi_acquire;
225     }
226    
227 root 1.4 bool
228     enable (bool enable = NO_INIT)
229     CODE:
230     RETVAL = global_enable;
231     if (items)
232     global_enable = enable;
233     OUTPUT:
234     RETVAL
235    
236     void
237     scoped_enable ()
238     CODE:
239     LEAVE; /* see Guard.xs */
240 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
241 root 1.4 ENTER; /* see Guard.xs */
242    
243     void
244     scoped_disable ()
245     CODE:
246     LEAVE; /* see Guard.xs */
247 root 1.6 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
248 root 1.4 ENTER; /* see Guard.xs */
249    
250 root 1.1 U32
251 root 1.6 min_idle_threads (U32 min = NO_INIT)
252 root 1.1 CODE:
253     X_LOCK (wait_m);
254 root 1.6 RETVAL = min_idle;
255 root 1.1 if (items)
256 root 1.6 min_idle = min;
257 root 1.1 X_UNLOCK (wait_m);
258     OUTPUT:
259     RETVAL
260    
261    
262     int
263     fd ()
264     CODE:
265     RETVAL = s_epipe_fd (&ep);
266     OUTPUT:
267     RETVAL
268    
269     void
270     poll (...)
271     CODE:
272     s_epipe_drain (&ep);
273     X_LOCK (wait_m);
274     while (waiters_count)
275     {
276     struct tctx *ctx = waiters [--waiters_count];
277     CORO_READY ((SV *)ctx->coro);
278     SvREFCNT_dec_NN ((SV *)ctx->coro);
279     ctx->coro = 0;
280     }
281     X_UNLOCK (wait_m);
282    
283     void
284 root 1.2 sleep (NV seconds)
285 root 1.1 CODE:
286     perlinterp_release ();
287 root 1.2 usleep (seconds * 1e6);
288 root 1.1 perlinterp_acquire ();
289 root 1.2