ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.3
Committed: Sun Jun 28 08:04:02 2015 UTC (8 years, 10 months ago) by root
Branch: MAIN
Changes since 1.2: +21 -10 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 struct tctx
26 {
27 void *coro;
28 xcond_t wait_c;
29 int wait_f;
30 };
31
32 static struct tctx *tctx_free;
33
34 static int available;
35 static int max_idle = 8;
36
37 static xmutex_t perl_m = X_MUTEX_INIT;
38 static xcond_t perl_c = X_COND_INIT;
39 static struct tctx *perl_f;
40
41 static xmutex_t wait_m = X_MUTEX_INIT;
42
43 static int wakeup_f;
44 static struct tctx **waiters;
45 static int waiters_count, waiters_max;
46
47 static struct tctx *
48 tctx_get (void)
49 {
50 struct tctx *ctx;
51
52 if (!tctx_free)
53 {
54 ctx = malloc (sizeof (*tctx_free));
55 X_COND_CREATE (ctx->wait_c);
56 }
57 else
58 {
59 ctx = tctx_free;
60 tctx_free = tctx_free->coro;
61 }
62
63 return ctx;
64 }
65
66 static void
67 tctx_put (struct tctx *ctx)
68 {
69 ctx->coro = tctx_free;
70 tctx_free = ctx;
71 }
72
73 X_THREAD_PROC(thread_proc)
74 {
75 PERL_SET_CONTEXT (perl_thx);
76
77 {
78 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
79 struct tctx *ctx;
80
81 for (;;)
82 {
83 /* TODO: should really use some idle time and exit after that */
84 X_LOCK (perl_m);
85 while (!perl_f)
86 X_COND_WAIT (perl_c, perl_m);
87 ctx = perl_f;
88 perl_f = 0;
89 --available;
90 X_UNLOCK (perl_m);
91
92 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
93
94 while (ctx->coro)
95 CORO_SCHEDULE;
96
97 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
98
99 X_LOCK (wait_m);
100 ctx->wait_f = 1;
101 X_COND_SIGNAL (ctx->wait_c);
102
103 if (available >= max_idle)
104 {
105 X_UNLOCK (wait_m);
106 break;
107 }
108
109 ++available;
110 X_UNLOCK (wait_m);
111 }
112 }
113 }
114
115 static void
116 start_thread (void)
117 {
118 xthread_t tid;
119
120 ++available;
121 xthread_create (&tid, thread_proc, 0);
122 }
123
124 static void
125 pmapi_release (void)
126 {
127 struct tctx *ctx = tctx_get ();
128 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
129 ctx->wait_f = 0;
130
131 pthread_setspecific (current_key, ctx);
132 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
133
134 if (!available)
135 start_thread ();
136
137 X_LOCK (perl_m);
138 perl_f = ctx;
139 X_COND_SIGNAL (perl_c);
140 X_UNLOCK (perl_m);
141 }
142
143 static void
144 pmapi_acquire (void)
145 {
146 struct tctx *ctx = pthread_getspecific (current_key);
147
148 X_LOCK (wait_m);
149
150 if (waiters_count >= waiters_max)
151 {
152 waiters_max = waiters_max ? waiters_max * 2 : 16;
153 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
154 }
155
156 waiters [waiters_count++] = ctx;
157
158 s_epipe_signal (&ep);
159 while (!ctx->wait_f)
160 X_COND_WAIT (ctx->wait_c, wait_m);
161 X_UNLOCK (wait_m);
162
163 tctx_put (ctx);
164 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
165 }
166
167 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
168
169 PROTOTYPES: DISABLE
170
171 BOOT:
172 {
173 #ifndef _WIN32
174 sigfillset (&fullsigset);
175 #endif
176
177 pthread_key_create (&current_key, 0);
178
179 if (s_epipe_new (&ep))
180 croak ("Coro::Multicore: unable to initialise event pipe.\n");
181
182 perl_thx = PERL_GET_CONTEXT;
183
184 I_CORO_API ("Coro::Multicore");
185
186 /* not perfectly efficient to do it this way, but it's simple */
187 perl_multicore_init ();
188 perl_multicore_api->pmapi_release = pmapi_release;
189 perl_multicore_api->pmapi_acquire = pmapi_acquire;
190 }
191
192 U32
193 max_idle_threads (U32 max = NO_INIT)
194 CODE:
195 X_LOCK (wait_m);
196 RETVAL = max_idle;
197 if (items)
198 max_idle = max;
199 X_UNLOCK (wait_m);
200 OUTPUT:
201 RETVAL
202
203
204 int
205 fd ()
206 CODE:
207 RETVAL = s_epipe_fd (&ep);
208 OUTPUT:
209 RETVAL
210
211 void
212 poll (...)
213 CODE:
214 s_epipe_drain (&ep);
215 X_LOCK (wait_m);
216 while (waiters_count)
217 {
218 struct tctx *ctx = waiters [--waiters_count];
219 CORO_READY ((SV *)ctx->coro);
220 SvREFCNT_dec_NN ((SV *)ctx->coro);
221 ctx->coro = 0;
222 }
223 X_UNLOCK (wait_m);
224
225 void
226 sleep (NV seconds)
227 CODE:
228 perlinterp_release ();
229 usleep (seconds * 1e6);
230 perlinterp_acquire ();
231