ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.1
Committed: Sat Jun 27 17:59:10 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 struct tctx
26 {
27 void *coro;
28 };
29
30 static struct tctx *tctx_free;
31
32 static int available;
33 static int max_idle = 8;
34
35 static xmutex_t perl_m = X_MUTEX_INIT; static xcond_t perl_c = X_COND_INIT; static struct tctx *perl_f;
36 static xmutex_t wait_m = X_MUTEX_INIT; static xcond_t wait_c = X_COND_INIT; static int wait_f;
37
38 static int wakeup_f;
39 static struct tctx **waiters;
40 static int waiters_count, waiters_max;
41
42 static struct tctx *
43 tctx_get (void)
44 {
45 struct tctx *ctx;
46
47 if (!tctx_free)
48 ctx = malloc (sizeof (*tctx_free));
49 else
50 {
51 ctx = tctx_free;
52 tctx_free = tctx_free->coro;
53 }
54
55 return ctx;
56 }
57
58 static void
59 tctx_put (struct tctx *ctx)
60 {
61 ctx->coro = tctx_free;
62 tctx_free = ctx;
63 }
64
65 X_THREAD_PROC(thread_proc)
66 {
67 PERL_SET_CONTEXT (perl_thx);
68
69 {
70 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
71 struct tctx *ctx;
72
73 for (;;)
74 {
75 X_LOCK (perl_m);
76 while (!perl_f)
77 X_COND_WAIT (perl_c, perl_m);
78 ctx = perl_f;
79 perl_f = 0;
80 --available;
81 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
82 X_UNLOCK (perl_m);
83
84 while (ctx->coro)
85 CORO_SCHEDULE;
86
87 X_LOCK (wait_m);
88 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
89 wait_f = 1;
90 X_COND_SIGNAL (wait_c);
91
92 if (available >= max_idle)
93 {
94 X_UNLOCK (wait_m);
95 break;
96 }
97
98 ++available;
99 X_UNLOCK (wait_m);
100 }
101 }
102 }
103
104 static void
105 start_thread (void)
106 {
107 xthread_t tid;
108
109 ++available;
110 xthread_create (&tid, thread_proc, 0);
111 }
112
113 static void
114 pmapi_release (void)
115 {
116 struct tctx *ctx = tctx_get ();
117 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
118 pthread_setspecific (current_key, ctx);
119 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
120
121 if (!available)
122 start_thread ();
123
124 X_LOCK (perl_m);
125 perl_f = ctx;
126 X_COND_SIGNAL (perl_c);
127 X_UNLOCK (perl_m);
128 }
129
130 static void
131 pmapi_acquire (void)
132 {
133 struct tctx *ctx = pthread_getspecific (current_key);
134
135 X_LOCK (wait_m);
136
137 if (waiters_count >= waiters_max)
138 {
139 waiters_max = waiters_max ? waiters_max * 2 : 16;
140 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
141 }
142
143 waiters [waiters_count++] = ctx;
144
145 s_epipe_signal (&ep);
146 while (!wait_f)
147 X_COND_WAIT (wait_c, wait_m);
148 wait_f = 0;
149 X_UNLOCK (wait_m);
150
151 tctx_put (ctx);
152 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
153 }
154
155 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
156
157 PROTOTYPES: DISABLE
158
159 BOOT:
160 {
161 #ifndef _WIN32
162 sigfillset (&fullsigset);
163 #endif
164
165 pthread_key_create (&current_key, 0);
166
167 if (s_epipe_new (&ep))
168 croak ("Coro::Multicore: unable to initialise event pipe.\n");
169
170 perl_thx = PERL_GET_CONTEXT;
171
172 I_CORO_API ("Coro::Multicore");
173
174 /* not perfectly efficient to do it this way, but it's simple */
175 perl_multicore_init ();
176 perl_multicore_api->pmapi_release = pmapi_release;
177 perl_multicore_api->pmapi_acquire = pmapi_acquire;
178 }
179
180 U32
181 max_idle_threads (U32 max = NO_INIT)
182 CODE:
183 X_LOCK (wait_m);
184 RETVAL = max_idle;
185 if (items)
186 max_idle = max;
187 X_UNLOCK (wait_m);
188 OUTPUT:
189 RETVAL
190
191
192 int
193 fd ()
194 CODE:
195 RETVAL = s_epipe_fd (&ep);
196 OUTPUT:
197 RETVAL
198
199 void
200 poll (...)
201 CODE:
202 s_epipe_drain (&ep);
203 X_LOCK (wait_m);
204 while (waiters_count)
205 {
206 struct tctx *ctx = waiters [--waiters_count];
207 CORO_READY ((SV *)ctx->coro);
208 SvREFCNT_dec_NN ((SV *)ctx->coro);
209 ctx->coro = 0;
210 }
211 X_UNLOCK (wait_m);
212
213 void
214 tst (int i)
215 CODE:
216 perlinterp_release ();
217 usleep (100000 * i);
218 perlinterp_acquire ();