ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
(Generate patch)

Comparing Coro-Multicore/Multicore.xs (file contents):
Revision 1.8 by root, Wed Jul 1 22:39:07 2015 UTC vs.
Revision 1.9 by root, Fri Jul 3 02:35:48 2015 UTC

36/* assigned to a thread for each release/acquire */ 36/* assigned to a thread for each release/acquire */
37struct tctx 37struct tctx
38{ 38{
39 void *coro; 39 void *coro;
40 int wait_f; 40 int wait_f;
41 xcond_t wait_c; 41 xcond_t acquire_c;
42}; 42};
43 43
44static struct tctx *tctx_free; 44static struct tctx *tctx_free;
45 45
46static struct tctx *
47tctx_get (void)
48{
49 struct tctx *ctx;
50
51 if (!tctx_free)
52 {
53 ctx = malloc (sizeof (*tctx_free));
54 X_COND_CREATE (ctx->acquire_c);
55 }
56 else
57 {
58 ctx = tctx_free;
59 tctx_free = tctx_free->coro;
60 }
61
62 return ctx;
63}
64
65static void
66tctx_put (struct tctx *ctx)
67{
68 ctx->coro = tctx_free;
69 tctx_free = ctx;
70}
71
72/* a stack of tctxs */
73struct tctxs
74{
75 struct tctx **ctxs;
76 int cur, max;
77};
78
79static struct tctx *
80tctxs_get (struct tctxs *ctxs)
81{
82 return ctxs->ctxs[--ctxs->cur];
83}
84
85static void
86tctxs_put (struct tctxs *ctxs, struct tctx *ctx)
87{
88 if (ctxs->cur >= ctxs->max)
89 {
90 ctxs->max = ctxs->max ? ctxs->max * 2 : 16;
91 ctxs->ctxs = realloc (ctxs->ctxs, ctxs->max * sizeof (ctxs->ctxs[0]));
92 }
93
94 ctxs->ctxs[ctxs->cur++] = ctx;
95}
96
97static xmutex_t release_m = X_MUTEX_INIT;
98static xcond_t release_c = X_COND_INIT;
99static struct tctxs releasers;
46static int idle; 100static int idle;
47static int min_idle = 1; 101static int min_idle = 1;
102static int curthreads, max_threads = 1; /* protected by release_m */
48 103
49static xmutex_t perl_m = X_MUTEX_INIT;
50static xcond_t perl_c = X_COND_INIT;
51static struct tctx *perl_f;
52
53static xmutex_t wait_m = X_MUTEX_INIT; 104static xmutex_t acquire_m = X_MUTEX_INIT;
54
55static int wakeup_f;
56static struct tctx **waiters; 105static struct tctxs acquirers;
57static int waiters_count, waiters_max;
58
59static struct tctx *
60tctx_get (void)
61{
62 struct tctx *ctx;
63
64 if (!tctx_free)
65 {
66 ctx = malloc (sizeof (*tctx_free));
67 X_COND_CREATE (ctx->wait_c);
68 }
69 else
70 {
71 ctx = tctx_free;
72 tctx_free = tctx_free->coro;
73 }
74
75 return ctx;
76}
77
78static void
79tctx_put (struct tctx *ctx)
80{
81 ctx->coro = tctx_free;
82 tctx_free = ctx;
83}
84 106
85X_THREAD_PROC(thread_proc) 107X_THREAD_PROC(thread_proc)
86{ 108{
87 PERL_SET_CONTEXT (perl_thx); 109 PERL_SET_CONTEXT (perl_thx);
88 110
89 { 111 {
90 dTHX; /* inefficient, we already have perl_thx, but I see no better way */ 112 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
91 struct tctx *ctx; 113 struct tctx *ctx;
92 114
93 X_LOCK (perl_m); 115 X_LOCK (release_m);
94 116
95 for (;;) 117 for (;;)
96 { 118 {
97 while (!perl_f) 119 while (!releasers.cur)
98 if (idle <= min_idle || 1) 120 if (idle <= min_idle || 1)
99 X_COND_WAIT (perl_c, perl_m); 121 X_COND_WAIT (release_c, release_m);
100 else 122 else
101 { 123 {
102 struct timespec ts = { time (0) + idle - min_idle, 0 }; 124 struct timespec ts = { time (0) + idle - min_idle, 0 };
103 125
104 if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT) 126 if (X_COND_TIMEDWAIT (release_c, release_m, ts) == ETIMEDOUT)
105 if (idle > min_idle && !perl_f) 127 if (idle > min_idle && !releasers.cur)
106 break; 128 break;
107 } 129 }
108 130
109 ctx = perl_f; 131 ctx = tctxs_get (&releasers);
110 perl_f = 0;
111 --idle; 132 --idle;
112 X_UNLOCK (perl_m); 133 X_UNLOCK (release_m);
113 134
114 if (!ctx) /* timed out? */ 135 if (!ctx) /* timed out? */
115 break; 136 break;
116 137
117 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 138 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
119 while (ctx->coro) 140 while (ctx->coro)
120 CORO_SCHEDULE; 141 CORO_SCHEDULE;
121 142
122 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 143 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
123 144
124 X_LOCK (wait_m); 145 X_LOCK (acquire_m);
125 ctx->wait_f = 1; 146 ctx->wait_f = 1;
126 X_COND_SIGNAL (ctx->wait_c); 147 X_COND_SIGNAL (ctx->acquire_c);
127 X_UNLOCK (wait_m); 148 X_UNLOCK (acquire_m);
128 149
129 X_LOCK (perl_m); 150 X_LOCK (release_m);
130 ++idle; 151 ++idle;
131 } 152 }
132 } 153 }
133} 154}
134 155
135static void 156static void
136start_thread (void) 157start_thread (void)
137{ 158{
138 xthread_t tid; 159 xthread_t tid;
139 160
161 if (curthreads >= max_threads && 0)
162 return;
163
164 ++curthreads;
140 ++idle; 165 ++idle;
141 xthread_create (&tid, thread_proc, 0); 166 xthread_create (&tid, thread_proc, 0);
142} 167}
143 168
144static void 169static void
155 ctx->wait_f = 0; 180 ctx->wait_f = 0;
156 181
157 pthread_setspecific (current_key, ctx); 182 pthread_setspecific (current_key, ctx);
158 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 183 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
159 184
160 X_LOCK (perl_m); 185 X_LOCK (release_m);
161 186
162 if (idle <= min_idle) 187 if (idle <= min_idle)
163 start_thread (); 188 start_thread ();
164 189
165 perl_f = ctx; 190 tctxs_put (&releasers, ctx);
166 X_COND_SIGNAL (perl_c); 191 X_COND_SIGNAL (release_c);
167 192
193 while (!idle && releasers.cur)
194 {
195 X_UNLOCK (release_m);
196 X_LOCK (release_m);
197 }
198
168 X_UNLOCK (perl_m); 199 X_UNLOCK (release_m);
169} 200}
170 201
171static void 202static void
172pmapi_acquire (void) 203pmapi_acquire (void)
173{ 204{
174 struct tctx *ctx = pthread_getspecific (current_key); 205 struct tctx *ctx = pthread_getspecific (current_key);
175 206
176 if (!ctx) 207 if (!ctx)
177 return; 208 return;
178 209
179 X_LOCK (wait_m); 210 X_LOCK (acquire_m);
180 211
181 if (waiters_count >= waiters_max) 212 tctxs_put (&acquirers, ctx);
182 {
183 waiters_max = waiters_max ? waiters_max * 2 : 16;
184 waiters = realloc (waiters, waiters_max * sizeof (*waiters));
185 }
186
187 waiters [waiters_count++] = ctx;
188 213
189 s_epipe_signal (&ep); 214 s_epipe_signal (&ep);
190 while (!ctx->wait_f) 215 while (!ctx->wait_f)
191 X_COND_WAIT (ctx->wait_c, wait_m); 216 X_COND_WAIT (ctx->acquire_c, acquire_m);
192 X_UNLOCK (wait_m); 217 X_UNLOCK (acquire_m);
193 218
194 tctx_put (ctx); 219 tctx_put (ctx);
195 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 220 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
196} 221}
197 222
207 232
208BOOT: 233BOOT:
209{ 234{
210 #ifndef _WIN32 235 #ifndef _WIN32
211 sigfillset (&fullsigset); 236 sigfillset (&fullsigset);
237 sigemptyset (&fullsigset);
212 #endif 238 #endif
213 239
214 pthread_key_create (&current_key, 0); 240 pthread_key_create (&current_key, 0);
215 241
216 if (s_epipe_new (&ep)) 242 if (s_epipe_new (&ep))
218 244
219 perl_thx = PERL_GET_CONTEXT; 245 perl_thx = PERL_GET_CONTEXT;
220 246
221 I_CORO_API ("Coro::Multicore"); 247 I_CORO_API ("Coro::Multicore");
222 248
223 X_LOCK (perl_m); 249 X_LOCK (release_m);
224 while (idle < min_idle) 250 while (idle < min_idle)
225 start_thread (); 251 start_thread ();
226 start_thread ();//D
227 X_UNLOCK (perl_m); 252 X_UNLOCK (release_m);
228 253
229 /* not perfectly efficient to do it this way, but it's simple */ 254 /* not perfectly efficient to do it this way, but it's simple */
230 perl_multicore_init (); 255 perl_multicore_init ();
231 perl_multicore_api->pmapi_release = pmapi_release; 256 perl_multicore_api->pmapi_release = pmapi_release;
232 perl_multicore_api->pmapi_acquire = pmapi_acquire; 257 perl_multicore_api->pmapi_acquire = pmapi_acquire;
256 ENTER; /* see Guard.xs */ 281 ENTER; /* see Guard.xs */
257 282
258U32 283U32
259min_idle_threads (U32 min = NO_INIT) 284min_idle_threads (U32 min = NO_INIT)
260 CODE: 285 CODE:
261 X_LOCK (wait_m); 286 X_LOCK (acquire_m);
262 RETVAL = min_idle; 287 RETVAL = min_idle;
263 if (items) 288 if (items)
264 min_idle = min; 289 min_idle = min;
265 X_UNLOCK (wait_m); 290 X_UNLOCK (acquire_m);
266 OUTPUT: 291 OUTPUT:
267 RETVAL 292 RETVAL
268 293
269 294
270int 295int
276 301
277void 302void
278poll (...) 303poll (...)
279 CODE: 304 CODE:
280 s_epipe_drain (&ep); 305 s_epipe_drain (&ep);
281 X_LOCK (wait_m); 306 X_LOCK (acquire_m);
282 while (waiters_count) 307 while (acquirers.cur)
283 { 308 {
284 struct tctx *ctx = waiters [--waiters_count]; 309 struct tctx *ctx = tctxs_get (&acquirers);
285 CORO_READY ((SV *)ctx->coro); 310 CORO_READY ((SV *)ctx->coro);
286 SvREFCNT_dec_NN ((SV *)ctx->coro); 311 SvREFCNT_dec_NN ((SV *)ctx->coro);
287 ctx->coro = 0; 312 ctx->coro = 0;
288 } 313 }
289 X_UNLOCK (wait_m); 314 X_UNLOCK (acquire_m);
290 315
291void 316void
292sleep (NV seconds) 317sleep (NV seconds)
293 CODE: 318 CODE:
294 perlinterp_release (); 319 perlinterp_release ();

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines