ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.5
Committed: Sun Jun 28 18:43:17 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.4: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25 root 1.4 static int global_enable = 1;
26     static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27    
28 root 1.1 struct tctx
29     {
30     void *coro;
31 root 1.3 xcond_t wait_c;
32     int wait_f;
33 root 1.1 };
34    
35     static struct tctx *tctx_free;
36    
37 root 1.5 static int idle_timeout;
38     static int idle;
39 root 1.1 static int max_idle = 8;
40    
41 root 1.3 static xmutex_t perl_m = X_MUTEX_INIT;
42     static xcond_t perl_c = X_COND_INIT;
43     static struct tctx *perl_f;
44    
45     static xmutex_t wait_m = X_MUTEX_INIT;
46 root 1.1
47     static int wakeup_f;
48     static struct tctx **waiters;
49     static int waiters_count, waiters_max;
50    
51     static struct tctx *
52     tctx_get (void)
53     {
54     struct tctx *ctx;
55    
56     if (!tctx_free)
57 root 1.3 {
58     ctx = malloc (sizeof (*tctx_free));
59     X_COND_CREATE (ctx->wait_c);
60     }
61 root 1.1 else
62     {
63     ctx = tctx_free;
64     tctx_free = tctx_free->coro;
65     }
66    
67     return ctx;
68     }
69    
70     static void
71     tctx_put (struct tctx *ctx)
72     {
73     ctx->coro = tctx_free;
74     tctx_free = ctx;
75     }
76    
77     X_THREAD_PROC(thread_proc)
78     {
79     PERL_SET_CONTEXT (perl_thx);
80    
81     {
82     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
83     struct tctx *ctx;
84    
85     for (;;)
86     {
87 root 1.2 /* TODO: should really use some idle time and exit after that */
88 root 1.1 X_LOCK (perl_m);
89     while (!perl_f)
90     X_COND_WAIT (perl_c, perl_m);
91     ctx = perl_f;
92     perl_f = 0;
93     --available;
94 root 1.3 X_UNLOCK (perl_m);
95    
96 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
97    
98     while (ctx->coro)
99     CORO_SCHEDULE;
100    
101 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
102    
103 root 1.1 X_LOCK (wait_m);
104 root 1.3 ctx->wait_f = 1;
105     X_COND_SIGNAL (ctx->wait_c);
106 root 1.1
107     if (available >= max_idle)
108     {
109     X_UNLOCK (wait_m);
110     break;
111     }
112    
113     ++available;
114     X_UNLOCK (wait_m);
115     }
116     }
117     }
118    
119     static void
120     start_thread (void)
121     {
122     xthread_t tid;
123    
124     ++available;
125     xthread_create (&tid, thread_proc, 0);
126     }
127    
128     static void
129     pmapi_release (void)
130     {
131 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
132     {
133     pthread_setspecific (current_key, 0);
134     return;
135     }
136    
137 root 1.1 struct tctx *ctx = tctx_get ();
138     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
139 root 1.3 ctx->wait_f = 0;
140    
141 root 1.1 pthread_setspecific (current_key, ctx);
142     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
143    
144     if (!available)
145     start_thread ();
146    
147     X_LOCK (perl_m);
148     perl_f = ctx;
149     X_COND_SIGNAL (perl_c);
150     X_UNLOCK (perl_m);
151     }
152    
153     static void
154     pmapi_acquire (void)
155     {
156     struct tctx *ctx = pthread_getspecific (current_key);
157    
158 root 1.4 if (!ctx)
159     return;
160    
161 root 1.1 X_LOCK (wait_m);
162    
163     if (waiters_count >= waiters_max)
164     {
165     waiters_max = waiters_max ? waiters_max * 2 : 16;
166     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
167     }
168    
169     waiters [waiters_count++] = ctx;
170    
171     s_epipe_signal (&ep);
172 root 1.3 while (!ctx->wait_f)
173     X_COND_WAIT (ctx->wait_c, wait_m);
174 root 1.1 X_UNLOCK (wait_m);
175    
176     tctx_put (ctx);
177     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
178     }
179    
180 root 1.4 static void
181     set_enable_0 (pTHX)
182     {
183     thread_enable = 0;
184     }
185    
186     static void
187     set_enable_1 (pTHX)
188     {
189     thread_enable = 1;
190     }
191    
192     static void
193     set_enable_2 (pTHX)
194     {
195     thread_enable = 2;
196     }
197    
198 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
199    
200     PROTOTYPES: DISABLE
201    
202     BOOT:
203     {
204     #ifndef _WIN32
205     sigfillset (&fullsigset);
206     #endif
207    
208     pthread_key_create (&current_key, 0);
209    
210     if (s_epipe_new (&ep))
211     croak ("Coro::Multicore: unable to initialise event pipe.\n");
212    
213     perl_thx = PERL_GET_CONTEXT;
214    
215     I_CORO_API ("Coro::Multicore");
216    
217     /* not perfectly efficient to do it this way, but it's simple */
218     perl_multicore_init ();
219     perl_multicore_api->pmapi_release = pmapi_release;
220     perl_multicore_api->pmapi_acquire = pmapi_acquire;
221     }
222    
223 root 1.4 bool
224     enable (bool enable = NO_INIT)
225     CODE:
226     RETVAL = global_enable;
227     if (items)
228     global_enable = enable;
229     OUTPUT:
230     RETVAL
231    
232     void
233     scoped_enable ()
234     CODE:
235     LEAVE; /* see Guard.xs */
236     CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0);
237     ENTER; /* see Guard.xs */
238    
239     void
240     scoped_disable ()
241     CODE:
242     LEAVE; /* see Guard.xs */
243     CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0);
244     ENTER; /* see Guard.xs */
245    
246 root 1.1 U32
247     max_idle_threads (U32 max = NO_INIT)
248     CODE:
249     X_LOCK (wait_m);
250     RETVAL = max_idle;
251     if (items)
252     max_idle = max;
253     X_UNLOCK (wait_m);
254     OUTPUT:
255     RETVAL
256    
257    
258     int
259     fd ()
260     CODE:
261     RETVAL = s_epipe_fd (&ep);
262     OUTPUT:
263     RETVAL
264    
265     void
266     poll (...)
267     CODE:
268     s_epipe_drain (&ep);
269     X_LOCK (wait_m);
270     while (waiters_count)
271     {
272     struct tctx *ctx = waiters [--waiters_count];
273     CORO_READY ((SV *)ctx->coro);
274     SvREFCNT_dec_NN ((SV *)ctx->coro);
275     ctx->coro = 0;
276     }
277     X_UNLOCK (wait_m);
278    
279     void
280 root 1.2 sleep (NV seconds)
281 root 1.1 CODE:
282     perlinterp_release ();
283 root 1.2 usleep (seconds * 1e6);
284 root 1.1 perlinterp_acquire ();
285 root 1.2