ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.4
Committed: Sun Jun 28 17:39:42 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.3: +53 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25 root 1.4 static int global_enable = 1;
26     static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27    
28 root 1.1 struct tctx
29     {
30     void *coro;
31 root 1.3 xcond_t wait_c;
32     int wait_f;
33 root 1.1 };
34    
35     static struct tctx *tctx_free;
36    
37     static int available;
38     static int max_idle = 8;
39    
40 root 1.3 static xmutex_t perl_m = X_MUTEX_INIT;
41     static xcond_t perl_c = X_COND_INIT;
42     static struct tctx *perl_f;
43    
44     static xmutex_t wait_m = X_MUTEX_INIT;
45 root 1.1
46     static int wakeup_f;
47     static struct tctx **waiters;
48     static int waiters_count, waiters_max;
49    
50     static struct tctx *
51     tctx_get (void)
52     {
53     struct tctx *ctx;
54    
55     if (!tctx_free)
56 root 1.3 {
57     ctx = malloc (sizeof (*tctx_free));
58     X_COND_CREATE (ctx->wait_c);
59     }
60 root 1.1 else
61     {
62     ctx = tctx_free;
63     tctx_free = tctx_free->coro;
64     }
65    
66     return ctx;
67     }
68    
69     static void
70     tctx_put (struct tctx *ctx)
71     {
72     ctx->coro = tctx_free;
73     tctx_free = ctx;
74     }
75    
76     X_THREAD_PROC(thread_proc)
77     {
78     PERL_SET_CONTEXT (perl_thx);
79    
80     {
81     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
82     struct tctx *ctx;
83    
84     for (;;)
85     {
86 root 1.2 /* TODO: should really use some idle time and exit after that */
87 root 1.1 X_LOCK (perl_m);
88     while (!perl_f)
89     X_COND_WAIT (perl_c, perl_m);
90     ctx = perl_f;
91     perl_f = 0;
92     --available;
93 root 1.3 X_UNLOCK (perl_m);
94    
95 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
96    
97     while (ctx->coro)
98     CORO_SCHEDULE;
99    
100 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
101    
102 root 1.1 X_LOCK (wait_m);
103 root 1.3 ctx->wait_f = 1;
104     X_COND_SIGNAL (ctx->wait_c);
105 root 1.1
106     if (available >= max_idle)
107     {
108     X_UNLOCK (wait_m);
109     break;
110     }
111    
112     ++available;
113     X_UNLOCK (wait_m);
114     }
115     }
116     }
117    
118     static void
119     start_thread (void)
120     {
121     xthread_t tid;
122    
123     ++available;
124     xthread_create (&tid, thread_proc, 0);
125     }
126    
127     static void
128     pmapi_release (void)
129     {
130 root 1.4 if (!(thread_enable ? thread_enable & 1 : global_enable))
131     {
132     pthread_setspecific (current_key, 0);
133     return;
134     }
135    
136 root 1.1 struct tctx *ctx = tctx_get ();
137     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
138 root 1.3 ctx->wait_f = 0;
139    
140 root 1.1 pthread_setspecific (current_key, ctx);
141     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
142    
143     if (!available)
144     start_thread ();
145    
146     X_LOCK (perl_m);
147     perl_f = ctx;
148     X_COND_SIGNAL (perl_c);
149     X_UNLOCK (perl_m);
150     }
151    
152     static void
153     pmapi_acquire (void)
154     {
155     struct tctx *ctx = pthread_getspecific (current_key);
156    
157 root 1.4 if (!ctx)
158     return;
159    
160 root 1.1 X_LOCK (wait_m);
161    
162     if (waiters_count >= waiters_max)
163     {
164     waiters_max = waiters_max ? waiters_max * 2 : 16;
165     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
166     }
167    
168     waiters [waiters_count++] = ctx;
169    
170     s_epipe_signal (&ep);
171 root 1.3 while (!ctx->wait_f)
172     X_COND_WAIT (ctx->wait_c, wait_m);
173 root 1.1 X_UNLOCK (wait_m);
174    
175     tctx_put (ctx);
176     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
177     }
178    
179 root 1.4 static void
180     set_enable_0 (pTHX)
181     {
182     thread_enable = 0;
183     }
184    
185     static void
186     set_enable_1 (pTHX)
187     {
188     thread_enable = 1;
189     }
190    
191     static void
192     set_enable_2 (pTHX)
193     {
194     thread_enable = 2;
195     }
196    
197 root 1.1 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
198    
199     PROTOTYPES: DISABLE
200    
201     BOOT:
202     {
203     #ifndef _WIN32
204     sigfillset (&fullsigset);
205     #endif
206    
207     pthread_key_create (&current_key, 0);
208    
209     if (s_epipe_new (&ep))
210     croak ("Coro::Multicore: unable to initialise event pipe.\n");
211    
212     perl_thx = PERL_GET_CONTEXT;
213    
214     I_CORO_API ("Coro::Multicore");
215    
216     /* not perfectly efficient to do it this way, but it's simple */
217     perl_multicore_init ();
218     perl_multicore_api->pmapi_release = pmapi_release;
219     perl_multicore_api->pmapi_acquire = pmapi_acquire;
220     }
221    
222 root 1.4 bool
223     enable (bool enable = NO_INIT)
224     CODE:
225     RETVAL = global_enable;
226     if (items)
227     global_enable = enable;
228     OUTPUT:
229     RETVAL
230    
231     void
232     scoped_enable ()
233     CODE:
234     LEAVE; /* see Guard.xs */
235     CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0);
236     ENTER; /* see Guard.xs */
237    
238     void
239     scoped_disable ()
240     CODE:
241     LEAVE; /* see Guard.xs */
242     CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0);
243     ENTER; /* see Guard.xs */
244    
245 root 1.1 U32
246     max_idle_threads (U32 max = NO_INIT)
247     CODE:
248     X_LOCK (wait_m);
249     RETVAL = max_idle;
250     if (items)
251     max_idle = max;
252     X_UNLOCK (wait_m);
253     OUTPUT:
254     RETVAL
255    
256    
257     int
258     fd ()
259     CODE:
260     RETVAL = s_epipe_fd (&ep);
261     OUTPUT:
262     RETVAL
263    
264     void
265     poll (...)
266     CODE:
267     s_epipe_drain (&ep);
268     X_LOCK (wait_m);
269     while (waiters_count)
270     {
271     struct tctx *ctx = waiters [--waiters_count];
272     CORO_READY ((SV *)ctx->coro);
273     SvREFCNT_dec_NN ((SV *)ctx->coro);
274     ctx->coro = 0;
275     }
276     X_UNLOCK (wait_m);
277    
278     void
279 root 1.2 sleep (NV seconds)
280 root 1.1 CODE:
281     perlinterp_release ();
282 root 1.2 usleep (seconds * 1e6);
283 root 1.1 perlinterp_acquire ();
284 root 1.2