ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
Revision: 1.2
Committed: Sat Jun 27 18:13:34 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Changes since 1.1: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define PERL_NO_GET_CONTEXT
2
3 #include "EXTERN.h"
4 #include "perl.h"
5 #include "XSUB.h"
6
7 #define X_STACKSIZE -1
8
9 #include "CoroAPI.h"
10 #include "perlmulticore.h"
11 #include "schmorp.h"
12 #include "xthread.h"
13
14 #ifdef _WIN32
15 typedef char sigset_t;
16 #define pthread_sigmask(mode,new,old)
17 #endif
18
19 static pthread_key_t current_key;
20
21 static s_epipe ep;
22 static void *perl_thx;
23 static sigset_t cursigset, fullsigset;
24
25 struct tctx
26 {
27 void *coro;
28 };
29
30 static struct tctx *tctx_free;
31
32 static int available;
33 static int max_idle = 8;
34
35 static xmutex_t perl_m = X_MUTEX_INIT; static xcond_t perl_c = X_COND_INIT; static struct tctx *perl_f;
36 static xmutex_t wait_m = X_MUTEX_INIT; static xcond_t wait_c = X_COND_INIT; static int wait_f;
37
38 static int wakeup_f;
39 static struct tctx **waiters;
40 static int waiters_count, waiters_max;
41
42 static struct tctx *
43 tctx_get (void)
44 {
45 struct tctx *ctx;
46
47 if (!tctx_free)
48 ctx = malloc (sizeof (*tctx_free));
49 else
50 {
51 ctx = tctx_free;
52 tctx_free = tctx_free->coro;
53 }
54
55 return ctx;
56 }
57
58 static void
59 tctx_put (struct tctx *ctx)
60 {
61 ctx->coro = tctx_free;
62 tctx_free = ctx;
63 }
64
65 X_THREAD_PROC(thread_proc)
66 {
67 PERL_SET_CONTEXT (perl_thx);
68
69 {
70 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
71 struct tctx *ctx;
72
73 for (;;)
74 {
75 /* TODO: should really use some idle time and exit after that */
76 X_LOCK (perl_m);
77 while (!perl_f)
78 X_COND_WAIT (perl_c, perl_m);
79 ctx = perl_f;
80 perl_f = 0;
81 --available;
82 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
83 X_UNLOCK (perl_m);
84
85 while (ctx->coro)
86 CORO_SCHEDULE;
87
88 X_LOCK (wait_m);
89 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
90 wait_f = 1;
91 X_COND_SIGNAL (wait_c);
92
93 if (available >= max_idle)
94 {
95 X_UNLOCK (wait_m);
96 break;
97 }
98
99 ++available;
100 X_UNLOCK (wait_m);
101 }
102 }
103 }
104
105 static void
106 start_thread (void)
107 {
108 xthread_t tid;
109
110 ++available;
111 xthread_create (&tid, thread_proc, 0);
112 }
113
114 static void
115 pmapi_release (void)
116 {
117 struct tctx *ctx = tctx_get ();
118 ctx->coro = SvREFCNT_inc_NN (CORO_CURRENT);
119 pthread_setspecific (current_key, ctx);
120 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
121
122 if (!available)
123 start_thread ();
124
125 X_LOCK (perl_m);
126 perl_f = ctx;
127 X_COND_SIGNAL (perl_c);
128 X_UNLOCK (perl_m);
129 }
130
131 static void
132 pmapi_acquire (void)
133 {
134 struct tctx *ctx = pthread_getspecific (current_key);
135
136 X_LOCK (wait_m);
137
138 if (waiters_count >= waiters_max)
139 {
140 waiters_max = waiters_max ? waiters_max * 2 : 16;
141 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
142 }
143
144 waiters [waiters_count++] = ctx;
145
146 s_epipe_signal (&ep);
147 while (!wait_f)
148 X_COND_WAIT (wait_c, wait_m);
149 wait_f = 0;
150 X_UNLOCK (wait_m);
151
152 tctx_put (ctx);
153 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
154 }
155
156 MODULE = Coro::Multicore PACKAGE = Coro::Multicore
157
158 PROTOTYPES: DISABLE
159
160 BOOT:
161 {
162 #ifndef _WIN32
163 sigfillset (&fullsigset);
164 #endif
165
166 pthread_key_create (&current_key, 0);
167
168 if (s_epipe_new (&ep))
169 croak ("Coro::Multicore: unable to initialise event pipe.\n");
170
171 perl_thx = PERL_GET_CONTEXT;
172
173 I_CORO_API ("Coro::Multicore");
174
175 /* not perfectly efficient to do it this way, but it's simple */
176 perl_multicore_init ();
177 perl_multicore_api->pmapi_release = pmapi_release;
178 perl_multicore_api->pmapi_acquire = pmapi_acquire;
179 }
180
181 U32
182 max_idle_threads (U32 max = NO_INIT)
183 CODE:
184 X_LOCK (wait_m);
185 RETVAL = max_idle;
186 if (items)
187 max_idle = max;
188 X_UNLOCK (wait_m);
189 OUTPUT:
190 RETVAL
191
192
193 int
194 fd ()
195 CODE:
196 RETVAL = s_epipe_fd (&ep);
197 OUTPUT:
198 RETVAL
199
200 void
201 poll (...)
202 CODE:
203 s_epipe_drain (&ep);
204 X_LOCK (wait_m);
205 while (waiters_count)
206 {
207 struct tctx *ctx = waiters [--waiters_count];
208 CORO_READY ((SV *)ctx->coro);
209 SvREFCNT_dec_NN ((SV *)ctx->coro);
210 ctx->coro = 0;
211 }
212 X_UNLOCK (wait_m);
213
214 void
215 sleep (NV seconds)
216 CODE:
217 perlinterp_release ();
218 usleep (seconds * 1e6);
219 perlinterp_acquire ();
220