… | |
… | |
22 | |
22 | |
23 | #include "etp.h" |
23 | #include "etp.h" |
24 | |
24 | |
25 | static int req_finish (ETP_REQ *req); |
25 | static int req_finish (ETP_REQ *req); |
26 | #define ETP_FINISH(req) req_finish (req) |
26 | #define ETP_FINISH(req) req_finish (req) |
27 | static void req_destroy (ETP_REQ *req); |
|
|
28 | #define ETP_DESTROY(req) req_destroy (req); |
|
|
29 | #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req)) |
27 | #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req)) |
30 | #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req)) |
28 | #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req)) |
31 | |
29 | |
32 | #include "etp.c" |
30 | #include "etp.c" |
33 | |
31 | |
… | |
… | |
87 | else |
85 | else |
88 | { |
86 | { |
89 | req->grp_next = self->freelist; |
87 | req->grp_next = self->freelist; |
90 | self->freelist = req; |
88 | self->freelist = req; |
91 | self->freesize += req->allocated; |
89 | self->freesize += req->allocated; |
92 | |
|
|
93 | printf ("freelist now %d\n", self->freesize);//D |
|
|
94 | } |
90 | } |
95 | } |
|
|
96 | |
|
|
97 | static int |
|
|
98 | req_finish (ETP_REQ *req) |
|
|
99 | { |
|
|
100 | dSP; |
|
|
101 | |
|
|
102 | if (ecb_expect_false (ETP_CANCELLED (req))) |
|
|
103 | return 0; |
|
|
104 | |
|
|
105 | ENTER; |
|
|
106 | SAVETMPS; |
|
|
107 | PUSHMARK (SP); |
|
|
108 | |
|
|
109 | PUTBACK; |
|
|
110 | |
|
|
111 | if (req->handler->xstpreq_finish) |
|
|
112 | { |
|
|
113 | dTHX; |
|
|
114 | req->handler->xstpreq_finish (aTHX_ XSTPREQ (req)); |
|
|
115 | } |
|
|
116 | |
|
|
117 | call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD); |
|
|
118 | |
|
|
119 | SPAGAIN; |
|
|
120 | |
|
|
121 | FREETMPS; |
|
|
122 | LEAVE; |
|
|
123 | |
|
|
124 | PUTBACK; |
|
|
125 | |
|
|
126 | return !!SvTRUE (ERRSV); |
|
|
127 | } |
91 | } |
128 | |
92 | |
129 | static void |
93 | static void |
130 | req_destroy (ETP_REQ *req) |
94 | req_destroy (ETP_REQ *req) |
131 | { |
95 | { |
… | |
… | |
136 | } |
100 | } |
137 | |
101 | |
138 | req_put (req->pool, req); |
102 | req_put (req->pool, req); |
139 | } |
103 | } |
140 | |
104 | |
|
|
105 | static int |
|
|
106 | req_finish (ETP_REQ *req) |
|
|
107 | { |
|
|
108 | dSP; |
|
|
109 | |
|
|
110 | if (ecb_expect_true (!ETP_CANCELLED (req))) |
|
|
111 | { |
|
|
112 | ENTER; |
|
|
113 | SAVETMPS; |
|
|
114 | PUSHMARK (SP); |
|
|
115 | |
|
|
116 | PUTBACK; |
|
|
117 | |
|
|
118 | if (req->handler->xstpreq_finish) |
|
|
119 | { |
|
|
120 | dTHX; |
|
|
121 | req->handler->xstpreq_finish (aTHX_ XSTPREQ (req)); |
|
|
122 | } |
|
|
123 | |
|
|
124 | call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD); |
|
|
125 | |
|
|
126 | SPAGAIN; |
|
|
127 | |
|
|
128 | FREETMPS; |
|
|
129 | LEAVE; |
|
|
130 | |
|
|
131 | PUTBACK; |
|
|
132 | } |
|
|
133 | |
|
|
134 | req_destroy (req); |
|
|
135 | |
|
|
136 | return !!SvTRUE (ERRSV); |
|
|
137 | } |
|
|
138 | |
141 | static void |
139 | static void |
142 | xstp_poll (pTHX_ CV *cv) |
140 | xstp_poll (pTHX_ CV *cv) |
143 | { |
141 | { |
144 | dXSARGS; |
142 | dXSARGS; |
145 | |
143 | |
… | |
… | |
163 | done_poll (void *userdata) |
161 | done_poll (void *userdata) |
164 | { |
162 | { |
165 | xsthreadpool self = (xsthreadpool)userdata; |
163 | xsthreadpool self = (xsthreadpool)userdata; |
166 | s_epipe_drain (&self->ep); |
164 | s_epipe_drain (&self->ep); |
167 | } |
165 | } |
168 | |
|
|
169 | #define xstp_fileno(self) s_epipe_fd (&self->ep) |
|
|
170 | |
166 | |
171 | static void |
167 | static void |
172 | xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems) |
168 | xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems) |
173 | { |
169 | { |
174 | if (nitems != 1) |
170 | if (nitems != 1) |
… | |
… | |
211 | |
207 | |
212 | static void |
208 | static void |
213 | xstp_burn_finish (pTHX_ NV *req) |
209 | xstp_burn_finish (pTHX_ NV *req) |
214 | { |
210 | { |
215 | dSP; |
211 | dSP; |
216 | printf ("push %f\n", *req);//D |
|
|
217 | XPUSHs (sv_2mortal (newSVnv (*req))); |
212 | XPUSHs (sv_2mortal (newSVnv (*req))); |
218 | PUTBACK; |
213 | PUTBACK; |
219 | } |
214 | } |
|
|
215 | |
|
|
216 | #define xstp_fileno(self) s_epipe_fd (&self->ep) |
|
|
217 | #define xstp_nreqs(self) etp_nreqs (&self->etp) |
|
|
218 | #define xstp_nready(self) etp_nready (&self->etp) |
|
|
219 | #define xstp_npending(self) etp_npending (&self->etp) |
|
|
220 | #define xstp_nthreads(self) etp_nthreads (&self->etp) |
|
|
221 | |
|
|
222 | #define xstp_max_poll_time(self,seconds) etp_set_max_poll_time (&self->etp, seconds) |
|
|
223 | #define xstp_max_poll_reqs(self,maxreqs) etp_set_max_poll_reqs (&self->etp, maxreqs) |
|
|
224 | #define xstp_idle_timeout(self,seconds) etp_set_idle_timeout (&self->etp, seconds) |
|
|
225 | #define xstp_max_idle(self,threads) etp_set_max_idle (&self->etp, threads) |
|
|
226 | #define xstp_min_parallel(self,threads) etp_set_min_parallel (&self->etp, threads) |
|
|
227 | #define xstp_max_parallel(self,threads) etp_set_max_parallel (&self->etp, threads) |
220 | |
228 | |
221 | MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_ |
229 | MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_ |
222 | |
230 | |
223 | PROTOTYPES: DISABLE |
231 | PROTOTYPES: DISABLE |
224 | |
232 | |
… | |
… | |
242 | _destroy (xsthreadpool self) |
250 | _destroy (xsthreadpool self) |
243 | CODE: |
251 | CODE: |
244 | //TODO: drain |
252 | //TODO: drain |
245 | s_epipe_destroy (&self->ep); |
253 | s_epipe_destroy (&self->ep); |
246 | |
254 | |
247 | int |
|
|
248 | xstp_fileno (xsthreadpool self) |
255 | int xstp_fileno (xsthreadpool self) |
|
|
256 | |
|
|
257 | unsigned int xstp_nreqs (xsthreadpool self) |
|
|
258 | |
|
|
259 | unsigned int xstp_nready (xsthreadpool self) |
|
|
260 | |
|
|
261 | unsigned int xstp_npending (xsthreadpool self) |
|
|
262 | |
|
|
263 | unsigned int xstp_nthreads (xsthreadpool self) |
|
|
264 | |
|
|
265 | void xstp_max_poll_time (xsthreadpool self, double seconds) |
|
|
266 | |
|
|
267 | void xstp_max_poll_reqs (xsthreadpool self, unsigned int maxreqs) |
|
|
268 | |
|
|
269 | void xstp_idle_timeout (xsthreadpool self, unsigned int seconds) |
|
|
270 | |
|
|
271 | void xstp_max_idle (xsthreadpool self, unsigned int threads) |
|
|
272 | |
|
|
273 | void xstp_min_parallel (xsthreadpool self, unsigned int nthreads) |
|
|
274 | |
|
|
275 | void xstp_max_parallel (xsthreadpool self, unsigned int nthreads) |
249 | |
276 | |
250 | void |
277 | void |
251 | req (xsthreadpool self, SV *type, ...) |
278 | req (xsthreadpool self, SV *type, ...) |
252 | PROTOTYPE: $$@ |
279 | PROTOTYPE: $$@ |
253 | PPCODE: |
280 | PPCODE: |
… | |
… | |
263 | croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),"); |
290 | croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),"); |
264 | |
291 | |
265 | req_alloc (self, handler->req_data_size); |
292 | req_alloc (self, handler->req_data_size); |
266 | req = self->freelist; |
293 | req = self->freelist; |
267 | |
294 | |
268 | printf ("req %p\n",req);//D |
|
|
269 | |
|
|
270 | req->callback = s_get_cv_croak (ST (items - 1)); |
295 | req->callback = s_get_cv_croak (ST (items - 1)); |
271 | |
296 | |
272 | req->handler = handler; |
297 | req->handler = handler; |
273 | /* if this croaks, we haven't lost anything */ |
298 | /* if this croaks, we haven't lost anything */ |
274 | handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3); |
299 | handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3); |