ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-Multicore/Multicore.xs
(Generate patch)

Comparing Coro-Multicore/Multicore.xs (file contents):
Revision 1.5 by root, Sun Jun 28 18:43:17 2015 UTC vs.
Revision 1.6 by root, Mon Jun 29 13:15:53 2015 UTC

20 20
21static s_epipe ep; 21static s_epipe ep;
22static void *perl_thx; 22static void *perl_thx;
23static sigset_t cursigset, fullsigset; 23static sigset_t cursigset, fullsigset;
24 24
25static int global_enable = 1; 25static int global_enable = 0;
26static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ 26static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */
27 27
28/* assigned to a thread for each release/acquire */
28struct tctx 29struct tctx
29{ 30{
30 void *coro; 31 void *coro;
32 int wait_f;
31 xcond_t wait_c; 33 xcond_t wait_c;
32 int wait_f;
33}; 34};
34 35
35static struct tctx *tctx_free; 36static struct tctx *tctx_free;
36 37
37static int idle_timeout;
38static int idle; 38static int idle;
39static int max_idle = 8; 39static int min_idle = 1;
40 40
41static xmutex_t perl_m = X_MUTEX_INIT; 41static xmutex_t perl_m = X_MUTEX_INIT;
42static xcond_t perl_c = X_COND_INIT; 42static xcond_t perl_c = X_COND_INIT;
43static struct tctx *perl_f; 43static struct tctx *perl_f;
44 44
80 80
81 { 81 {
82 dTHX; /* inefficient, we already have perl_thx, but I see no better way */ 82 dTHX; /* inefficient, we already have perl_thx, but I see no better way */
83 struct tctx *ctx; 83 struct tctx *ctx;
84 84
85 X_LOCK (perl_m);
86
85 for (;;) 87 for (;;)
86 { 88 {
87 /* TODO: should really use some idle time and exit after that */
88 X_LOCK (perl_m);
89 while (!perl_f) 89 while (!perl_f)
90 if (idle <= min_idle || 1)
90 X_COND_WAIT (perl_c, perl_m); 91 X_COND_WAIT (perl_c, perl_m);
92 else
93 {
94 struct timespec ts = { time (0) + idle - min_idle, 0 };
95
96 if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT)
97 if (idle > min_idle && !perl_f)
98 break;
99 }
100
91 ctx = perl_f; 101 ctx = perl_f;
92 perl_f = 0; 102 perl_f = 0;
93 --available; 103 --idle;
94 X_UNLOCK (perl_m); 104 X_UNLOCK (perl_m);
105
106 if (!ctx) /* timed out? */
107 break;
95 108
96 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 109 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
97 110
98 while (ctx->coro) 111 while (ctx->coro)
99 CORO_SCHEDULE; 112 CORO_SCHEDULE;
101 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 114 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
102 115
103 X_LOCK (wait_m); 116 X_LOCK (wait_m);
104 ctx->wait_f = 1; 117 ctx->wait_f = 1;
105 X_COND_SIGNAL (ctx->wait_c); 118 X_COND_SIGNAL (ctx->wait_c);
106
107 if (available >= max_idle)
108 {
109 X_UNLOCK (wait_m);
110 break;
111 }
112
113 ++available;
114 X_UNLOCK (wait_m); 119 X_UNLOCK (wait_m);
120
121 X_LOCK (perl_m);
122 ++idle;
115 } 123 }
116 } 124 }
117} 125}
118 126
119static void 127static void
120start_thread (void) 128start_thread (void)
121{ 129{
122 xthread_t tid; 130 xthread_t tid;
123 131
124 ++available; 132 ++idle;
125 xthread_create (&tid, thread_proc, 0); 133 xthread_create (&tid, thread_proc, 0);
126} 134}
127 135
128static void 136static void
129pmapi_release (void) 137pmapi_release (void)
139 ctx->wait_f = 0; 147 ctx->wait_f = 0;
140 148
141 pthread_setspecific (current_key, ctx); 149 pthread_setspecific (current_key, ctx);
142 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); 150 pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset);
143 151
144 if (!available) 152 X_LOCK (perl_m);
153
154 if (idle <= min_idle)
145 start_thread (); 155 start_thread ();
146 156
147 X_LOCK (perl_m);
148 perl_f = ctx; 157 perl_f = ctx;
149 X_COND_SIGNAL (perl_c); 158 X_COND_SIGNAL (perl_c);
159
150 X_UNLOCK (perl_m); 160 X_UNLOCK (perl_m);
151} 161}
152 162
153static void 163static void
154pmapi_acquire (void) 164pmapi_acquire (void)
176 tctx_put (ctx); 186 tctx_put (ctx);
177 pthread_sigmask (SIG_SETMASK, &cursigset, 0); 187 pthread_sigmask (SIG_SETMASK, &cursigset, 0);
178} 188}
179 189
180static void 190static void
181set_enable_0 (pTHX) 191set_thread_enable (pTHX_ void *arg)
182{ 192{
183 thread_enable = 0;
184}
185
186static void
187set_enable_1 (pTHX)
188{
189 thread_enable = 1;
190}
191
192static void
193set_enable_2 (pTHX)
194{
195 thread_enable = 2; 193 thread_enable = PTR2IV (arg);
196} 194}
197 195
198MODULE = Coro::Multicore PACKAGE = Coro::Multicore 196MODULE = Coro::Multicore PACKAGE = Coro::Multicore
199 197
200PROTOTYPES: DISABLE 198PROTOTYPES: DISABLE
211 croak ("Coro::Multicore: unable to initialise event pipe.\n"); 209 croak ("Coro::Multicore: unable to initialise event pipe.\n");
212 210
213 perl_thx = PERL_GET_CONTEXT; 211 perl_thx = PERL_GET_CONTEXT;
214 212
215 I_CORO_API ("Coro::Multicore"); 213 I_CORO_API ("Coro::Multicore");
214
215 X_LOCK (perl_m);
216 while (idle < min_idle)
217 start_thread ();
218 start_thread ();//D
219 X_UNLOCK (perl_m);
216 220
217 /* not perfectly efficient to do it this way, but it's simple */ 221 /* not perfectly efficient to do it this way, but it's simple */
218 perl_multicore_init (); 222 perl_multicore_init ();
219 perl_multicore_api->pmapi_release = pmapi_release; 223 perl_multicore_api->pmapi_release = pmapi_release;
220 perl_multicore_api->pmapi_acquire = pmapi_acquire; 224 perl_multicore_api->pmapi_acquire = pmapi_acquire;
231 235
232void 236void
233scoped_enable () 237scoped_enable ()
234 CODE: 238 CODE:
235 LEAVE; /* see Guard.xs */ 239 LEAVE; /* see Guard.xs */
236 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0); 240 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0);
237 ENTER; /* see Guard.xs */ 241 ENTER; /* see Guard.xs */
238 242
239void 243void
240scoped_disable () 244scoped_disable ()
241 CODE: 245 CODE:
242 LEAVE; /* see Guard.xs */ 246 LEAVE; /* see Guard.xs */
243 CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0); 247 CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0);
244 ENTER; /* see Guard.xs */ 248 ENTER; /* see Guard.xs */
245 249
246U32 250U32
247max_idle_threads (U32 max = NO_INIT) 251min_idle_threads (U32 min = NO_INIT)
248 CODE: 252 CODE:
249 X_LOCK (wait_m); 253 X_LOCK (wait_m);
250 RETVAL = max_idle; 254 RETVAL = min_idle;
251 if (items) 255 if (items)
252 max_idle = max; 256 min_idle = min;
253 X_UNLOCK (wait_m); 257 X_UNLOCK (wait_m);
254 OUTPUT: 258 OUTPUT:
255 RETVAL 259 RETVAL
256 260
257 261

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines