ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.3
Committed: Sun Jun 28 08:04:02 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.2: +21 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define PERL_NO_GET_CONTEXT
2    
3     #include "EXTERN.h"
4     #include "perl.h"
5     #include "XSUB.h"
6    
7     #define X_STACKSIZE -1
8    
9     #include "CoroAPI.h"
10     #include "perlmulticore.h"
11     #include "schmorp.h"
12     #include "xthread.h"
13    
14     #ifdef _WIN32
15     typedef char sigset_t;
16     #define pthread_sigmask(mode,new,old)
17     #endif
18    
19     static pthread_key_t current_key;
20    
21     static s_epipe ep;
22     static void *perl_thx;
23     static sigset_t cursigset, fullsigset;
24    
25     struct tctx
26     {
27     void *coro;
28 root 1.3 xcond_t wait_c;
29     int wait_f;
30 root 1.1 };
31    
32     static struct tctx *tctx_free;
33    
34     static int available;
35     static int max_idle = 8;
36    
37 root 1.3 static xmutex_t perl_m = X_MUTEX_INIT;
38     static xcond_t perl_c = X_COND_INIT;
39     static struct tctx *perl_f;
40    
41     static xmutex_t wait_m = X_MUTEX_INIT;
42 root 1.1
43     static int wakeup_f;
44     static struct tctx **waiters;
45     static int waiters_count, waiters_max;
46    
47     static struct tctx *
48     tctx_get (void)
49     {
50     struct tctx *ctx;
51    
52     if (!tctx_free)
53 root 1.3 {
54     ctx = malloc (sizeof (*tctx_free));
55     X_COND_CREATE (ctx->wait_c);
56     }
57 root 1.1 else
58     {
59     ctx = tctx_free;
60     tctx_free = tctx_free->coro;
61     }
62    
63     return ctx;
64     }
65    
66     static void
67     tctx_put (struct tctx *ctx)
68     {
69     ctx->coro = tctx_free;
70     tctx_free = ctx;
71     }
72    
73     X_THREAD_PROC(thread_proc)
74     {
75     PERL_SET_CONTEXT (perl_thx);
76    
77     {
78     dTHX; /* inefficient, we already have perl_thx, but I see no better way */
79     struct tctx *ctx;
80    
81     for (;;)
82     {
83 root 1.2 /* TODO: should really use some idle time and exit after that */
84 root 1.1 X_LOCK (perl_m);
85     while (!perl_f)
86     X_COND_WAIT (perl_c, perl_m);
87     ctx = perl_f;
88     perl_f = 0;
89     --available;
90 root 1.3 X_UNLOCK (perl_m);
91    
92 root 1.1 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
93    
94     while (ctx->coro)
95     CORO_SCHEDULE;
96    
97 root 1.3 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
98    
99 root 1.1 X_LOCK (wait_m);
100 root 1.3 ctx->wait_f = 1;
101     X_COND_SIGNAL (ctx->wait_c);
102 root 1.1
103     if (available >= max_idle)
104     {
105     X_UNLOCK (wait_m);
106     break;
107     }
108    
109     ++available;
110     X_UNLOCK (wait_m);
111     }
112     }
113     }
114    
115     static void
116     start_thread (void)
117     {
118     xthread_t tid;
119    
120     ++available;
121     xthread_create (&tid, thread_proc, 0);
122     }
123    
124     static void
125     pmapi_release (void)
126     {
127     struct tctx *ctx = tctx_get ();
128     ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
129 root 1.3 ctx->wait_f = 0;
130    
131 root 1.1 pthread_setspecific (current_key, ctx);
132     pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
133    
134     if (!available)
135     start_thread ();
136    
137     X_LOCK (perl_m);
138     perl_f = ctx;
139     X_COND_SIGNAL (perl_c);
140     X_UNLOCK (perl_m);
141     }
142    
143     static void
144     pmapi_acquire (void)
145     {
146     struct tctx *ctx = pthread_getspecific (current_key);
147    
148     X_LOCK (wait_m);
149    
150     if (waiters_count >= waiters_max)
151     {
152     waiters_max = waiters_max ? waiters_max * 2 : 16;
153     waiters = realloc (waiters, waiters_max * sizeof (*waiters));
154     }
155    
156     waiters [waiters_count++] = ctx;
157    
158     s_epipe_signal (&ep);
159 root 1.3 while (!ctx->wait_f)
160     X_COND_WAIT (ctx->wait_c, wait_m);
161 root 1.1 X_UNLOCK (wait_m);
162    
163     tctx_put (ctx);
164     pthread_sigmask (SIG_SETMASK, &cursigset, 0);
165     }
166    
167     MODULE = Coro::Multicore PACKAGE = Coro::Multicore
168    
169     PROTOTYPES: DISABLE
170    
171     BOOT:
172     {
173     #ifndef _WIN32
174     sigfillset (&fullsigset);
175     #endif
176    
177     pthread_key_create (&current_key, 0);
178    
179     if (s_epipe_new (&ep))
180     croak ("Coro::Multicore: unable to initialise event pipe.\n");
181    
182     perl_thx = PERL_GET_CONTEXT;
183    
184     I_CORO_API ("Coro::Multicore");
185    
186     /* not perfectly efficient to do it this way, but it's simple */
187     perl_multicore_init ();
188     perl_multicore_api->pmapi_release = pmapi_release;
189     perl_multicore_api->pmapi_acquire = pmapi_acquire;
190     }
191    
192     U32
193     max_idle_threads (U32 max = NO_INIT)
194     CODE:
195     X_LOCK (wait_m);
196     RETVAL = max_idle;
197     if (items)
198     max_idle = max;
199     X_UNLOCK (wait_m);
200     OUTPUT:
201     RETVAL
202    
203    
204     int
205     fd ()
206     CODE:
207     RETVAL = s_epipe_fd (&ep);
208     OUTPUT:
209     RETVAL
210    
211     void
212     poll (...)
213     CODE:
214     s_epipe_drain (&ep);
215     X_LOCK (wait_m);
216     while (waiters_count)
217     {
218     struct tctx *ctx = waiters [--waiters_count];
219     CORO_READY ((SV *)ctx->coro);
220     SvREFCNT_dec_NN ((SV *)ctx->coro);
221     ctx->coro = 0;
222     }
223     X_UNLOCK (wait_m);
224    
225     void
226 root 1.2 sleep (NV seconds)
227 root 1.1 CODE:
228     perlinterp_release ();
229 root 1.2 usleep (seconds * 1e6);
230 root 1.1 perlinterp_acquire ();
231 root 1.2