… | |
… | |
20 | |
20 | |
21 | static s_epipe ep; |
21 | static s_epipe ep; |
22 | static void *perl_thx; |
22 | static void *perl_thx; |
23 | static sigset_t cursigset, fullsigset; |
23 | static sigset_t cursigset, fullsigset; |
24 | |
24 | |
25 | static int global_enable = 1; |
25 | static int global_enable = 0; |
26 | static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ |
26 | static int thread_enable; /* 0 undefined, 1 disabled, 2 enabled */ |
27 | |
27 | |
|
|
28 | /* assigned to a thread for each release/acquire */ |
28 | struct tctx |
29 | struct tctx |
29 | { |
30 | { |
30 | void *coro; |
31 | void *coro; |
|
|
32 | int wait_f; |
31 | xcond_t wait_c; |
33 | xcond_t wait_c; |
32 | int wait_f; |
|
|
33 | }; |
34 | }; |
34 | |
35 | |
35 | static struct tctx *tctx_free; |
36 | static struct tctx *tctx_free; |
36 | |
37 | |
37 | static int idle_timeout; |
|
|
38 | static int idle; |
38 | static int idle; |
39 | static int max_idle = 8; |
39 | static int min_idle = 1; |
40 | |
40 | |
41 | static xmutex_t perl_m = X_MUTEX_INIT; |
41 | static xmutex_t perl_m = X_MUTEX_INIT; |
42 | static xcond_t perl_c = X_COND_INIT; |
42 | static xcond_t perl_c = X_COND_INIT; |
43 | static struct tctx *perl_f; |
43 | static struct tctx *perl_f; |
44 | |
44 | |
… | |
… | |
80 | |
80 | |
81 | { |
81 | { |
82 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
82 | dTHX; /* inefficient, we already have perl_thx, but I see no better way */ |
83 | struct tctx *ctx; |
83 | struct tctx *ctx; |
84 | |
84 | |
|
|
85 | X_LOCK (perl_m); |
|
|
86 | |
85 | for (;;) |
87 | for (;;) |
86 | { |
88 | { |
87 | /* TODO: should really use some idle time and exit after that */ |
|
|
88 | X_LOCK (perl_m); |
|
|
89 | while (!perl_f) |
89 | while (!perl_f) |
|
|
90 | if (idle <= min_idle || 1) |
90 | X_COND_WAIT (perl_c, perl_m); |
91 | X_COND_WAIT (perl_c, perl_m); |
|
|
92 | else |
|
|
93 | { |
|
|
94 | struct timespec ts = { time (0) + idle - min_idle, 0 }; |
|
|
95 | |
|
|
96 | if (X_COND_TIMEDWAIT (perl_c, perl_m, ts) == ETIMEDOUT) |
|
|
97 | if (idle > min_idle && !perl_f) |
|
|
98 | break; |
|
|
99 | } |
|
|
100 | |
91 | ctx = perl_f; |
101 | ctx = perl_f; |
92 | perl_f = 0; |
102 | perl_f = 0; |
93 | --available; |
103 | --idle; |
94 | X_UNLOCK (perl_m); |
104 | X_UNLOCK (perl_m); |
|
|
105 | |
|
|
106 | if (!ctx) /* timed out? */ |
|
|
107 | break; |
95 | |
108 | |
96 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
109 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
97 | |
110 | |
98 | while (ctx->coro) |
111 | while (ctx->coro) |
99 | CORO_SCHEDULE; |
112 | CORO_SCHEDULE; |
… | |
… | |
101 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
114 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
102 | |
115 | |
103 | X_LOCK (wait_m); |
116 | X_LOCK (wait_m); |
104 | ctx->wait_f = 1; |
117 | ctx->wait_f = 1; |
105 | X_COND_SIGNAL (ctx->wait_c); |
118 | X_COND_SIGNAL (ctx->wait_c); |
106 | |
|
|
107 | if (available >= max_idle) |
|
|
108 | { |
|
|
109 | X_UNLOCK (wait_m); |
|
|
110 | break; |
|
|
111 | } |
|
|
112 | |
|
|
113 | ++available; |
|
|
114 | X_UNLOCK (wait_m); |
119 | X_UNLOCK (wait_m); |
|
|
120 | |
|
|
121 | X_LOCK (perl_m); |
|
|
122 | ++idle; |
115 | } |
123 | } |
116 | } |
124 | } |
117 | } |
125 | } |
118 | |
126 | |
119 | static void |
127 | static void |
120 | start_thread (void) |
128 | start_thread (void) |
121 | { |
129 | { |
122 | xthread_t tid; |
130 | xthread_t tid; |
123 | |
131 | |
124 | ++available; |
132 | ++idle; |
125 | xthread_create (&tid, thread_proc, 0); |
133 | xthread_create (&tid, thread_proc, 0); |
126 | } |
134 | } |
127 | |
135 | |
128 | static void |
136 | static void |
129 | pmapi_release (void) |
137 | pmapi_release (void) |
… | |
… | |
139 | ctx->wait_f = 0; |
147 | ctx->wait_f = 0; |
140 | |
148 | |
141 | pthread_setspecific (current_key, ctx); |
149 | pthread_setspecific (current_key, ctx); |
142 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
150 | pthread_sigmask (SIG_SETMASK, &fullsigset, &cursigset); |
143 | |
151 | |
144 | if (!available) |
152 | X_LOCK (perl_m); |
|
|
153 | |
|
|
154 | if (idle <= min_idle) |
145 | start_thread (); |
155 | start_thread (); |
146 | |
156 | |
147 | X_LOCK (perl_m); |
|
|
148 | perl_f = ctx; |
157 | perl_f = ctx; |
149 | X_COND_SIGNAL (perl_c); |
158 | X_COND_SIGNAL (perl_c); |
|
|
159 | |
150 | X_UNLOCK (perl_m); |
160 | X_UNLOCK (perl_m); |
151 | } |
161 | } |
152 | |
162 | |
153 | static void |
163 | static void |
154 | pmapi_acquire (void) |
164 | pmapi_acquire (void) |
… | |
… | |
176 | tctx_put (ctx); |
186 | tctx_put (ctx); |
177 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
187 | pthread_sigmask (SIG_SETMASK, &cursigset, 0); |
178 | } |
188 | } |
179 | |
189 | |
180 | static void |
190 | static void |
181 | set_enable_0 (pTHX) |
191 | set_thread_enable (pTHX_ void *arg) |
182 | { |
192 | { |
183 | thread_enable = 0; |
|
|
184 | } |
|
|
185 | |
|
|
186 | static void |
|
|
187 | set_enable_1 (pTHX) |
|
|
188 | { |
|
|
189 | thread_enable = 1; |
|
|
190 | } |
|
|
191 | |
|
|
192 | static void |
|
|
193 | set_enable_2 (pTHX) |
|
|
194 | { |
|
|
195 | thread_enable = 2; |
193 | thread_enable = PTR2IV (arg); |
196 | } |
194 | } |
197 | |
195 | |
198 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
196 | MODULE = Coro::Multicore PACKAGE = Coro::Multicore |
199 | |
197 | |
200 | PROTOTYPES: DISABLE |
198 | PROTOTYPES: DISABLE |
… | |
… | |
211 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
209 | croak ("Coro::Multicore: unable to initialise event pipe.\n"); |
212 | |
210 | |
213 | perl_thx = PERL_GET_CONTEXT; |
211 | perl_thx = PERL_GET_CONTEXT; |
214 | |
212 | |
215 | I_CORO_API ("Coro::Multicore"); |
213 | I_CORO_API ("Coro::Multicore"); |
|
|
214 | |
|
|
215 | X_LOCK (perl_m); |
|
|
216 | while (idle < min_idle) |
|
|
217 | start_thread (); |
|
|
218 | start_thread ();//D |
|
|
219 | X_UNLOCK (perl_m); |
216 | |
220 | |
217 | /* not perfectly efficient to do it this way, but it's simple */ |
221 | /* not perfectly efficient to do it this way, but it's simple */ |
218 | perl_multicore_init (); |
222 | perl_multicore_init (); |
219 | perl_multicore_api->pmapi_release = pmapi_release; |
223 | perl_multicore_api->pmapi_release = pmapi_release; |
220 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
224 | perl_multicore_api->pmapi_acquire = pmapi_acquire; |
… | |
… | |
231 | |
235 | |
232 | void |
236 | void |
233 | scoped_enable () |
237 | scoped_enable () |
234 | CODE: |
238 | CODE: |
235 | LEAVE; /* see Guard.xs */ |
239 | LEAVE; /* see Guard.xs */ |
236 | CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_1, set_enable_0); |
240 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)1, set_thread_enable, (void *)0); |
237 | ENTER; /* see Guard.xs */ |
241 | ENTER; /* see Guard.xs */ |
238 | |
242 | |
239 | void |
243 | void |
240 | scoped_disable () |
244 | scoped_disable () |
241 | CODE: |
245 | CODE: |
242 | LEAVE; /* see Guard.xs */ |
246 | LEAVE; /* see Guard.xs */ |
243 | CORO_ENTERLEAVE_SCOPE_HOOK (set_enable_2, set_enable_0); |
247 | CORO_ENTERLEAVE_SCOPE_HOOK (set_thread_enable, (void *)2, set_thread_enable, (void *)0); |
244 | ENTER; /* see Guard.xs */ |
248 | ENTER; /* see Guard.xs */ |
245 | |
249 | |
246 | U32 |
250 | U32 |
247 | max_idle_threads (U32 max = NO_INIT) |
251 | min_idle_threads (U32 min = NO_INIT) |
248 | CODE: |
252 | CODE: |
249 | X_LOCK (wait_m); |
253 | X_LOCK (wait_m); |
250 | RETVAL = max_idle; |
254 | RETVAL = min_idle; |
251 | if (items) |
255 | if (items) |
252 | max_idle = max; |
256 | min_idle = min; |
253 | X_UNLOCK (wait_m); |
257 | X_UNLOCK (wait_m); |
254 | OUTPUT: |
258 | OUTPUT: |
255 | RETVAL |
259 | RETVAL |
256 | |
260 | |
257 | |
261 | |