ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-XSThreadPool/XSThreadPool.xs
(Generate patch)

Comparing AnyEvent-XSThreadPool/XSThreadPool.xs (file contents):
Revision 1.1 by root, Thu Jun 25 21:08:51 2015 UTC vs.
Revision 1.2 by root, Thu Jun 25 21:24:18 2015 UTC

22 22
23#include "etp.h" 23#include "etp.h"
24 24
25static int req_finish (ETP_REQ *req); 25static int req_finish (ETP_REQ *req);
26#define ETP_FINISH(req) req_finish (req) 26#define ETP_FINISH(req) req_finish (req)
27static void req_destroy (ETP_REQ *req);
28#define ETP_DESTROY(req) req_destroy (req);
29#define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req)) 27#define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req))
30#define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req)) 28#define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req))
31 29
32#include "etp.c" 30#include "etp.c"
33 31
87 else 85 else
88 { 86 {
89 req->grp_next = self->freelist; 87 req->grp_next = self->freelist;
90 self->freelist = req; 88 self->freelist = req;
91 self->freesize += req->allocated; 89 self->freesize += req->allocated;
92
93 printf ("freelist now %d\n", self->freesize);//D
94 } 90 }
95}
96
97static int
98req_finish (ETP_REQ *req)
99{
100 dSP;
101
102 if (ecb_expect_false (ETP_CANCELLED (req)))
103 return 0;
104
105 ENTER;
106 SAVETMPS;
107 PUSHMARK (SP);
108
109 PUTBACK;
110
111 if (req->handler->xstpreq_finish)
112 {
113 dTHX;
114 req->handler->xstpreq_finish (aTHX_ XSTPREQ (req));
115 }
116
117 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
118
119 SPAGAIN;
120
121 FREETMPS;
122 LEAVE;
123
124 PUTBACK;
125
126 return !!SvTRUE (ERRSV);
127} 91}
128 92
129static void 93static void
130req_destroy (ETP_REQ *req) 94req_destroy (ETP_REQ *req)
131{ 95{
136 } 100 }
137 101
138 req_put (req->pool, req); 102 req_put (req->pool, req);
139} 103}
140 104
105static int
106req_finish (ETP_REQ *req)
107{
108 dSP;
109
110 if (ecb_expect_true (!ETP_CANCELLED (req)))
111 {
112 ENTER;
113 SAVETMPS;
114 PUSHMARK (SP);
115
116 PUTBACK;
117
118 if (req->handler->xstpreq_finish)
119 {
120 dTHX;
121 req->handler->xstpreq_finish (aTHX_ XSTPREQ (req));
122 }
123
124 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
125
126 SPAGAIN;
127
128 FREETMPS;
129 LEAVE;
130
131 PUTBACK;
132 }
133
134 req_destroy (req);
135
136 return !!SvTRUE (ERRSV);
137}
138
141static void 139static void
142xstp_poll (pTHX_ CV *cv) 140xstp_poll (pTHX_ CV *cv)
143{ 141{
144 dXSARGS; 142 dXSARGS;
145 143
163done_poll (void *userdata) 161done_poll (void *userdata)
164{ 162{
165 xsthreadpool self = (xsthreadpool)userdata; 163 xsthreadpool self = (xsthreadpool)userdata;
166 s_epipe_drain (&self->ep); 164 s_epipe_drain (&self->ep);
167} 165}
168
169#define xstp_fileno(self) s_epipe_fd (&self->ep)
170 166
171static void 167static void
172xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems) 168xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems)
173{ 169{
174 if (nitems != 1) 170 if (nitems != 1)
211 207
212static void 208static void
213xstp_burn_finish (pTHX_ NV *req) 209xstp_burn_finish (pTHX_ NV *req)
214{ 210{
215 dSP; 211 dSP;
216 printf ("push %f\n", *req);//D
217 XPUSHs (sv_2mortal (newSVnv (*req))); 212 XPUSHs (sv_2mortal (newSVnv (*req)));
218 PUTBACK; 213 PUTBACK;
219} 214}
215
216#define xstp_fileno(self) s_epipe_fd (&self->ep)
217#define xstp_nreqs(self) etp_nreqs (&self->etp)
218#define xstp_nready(self) etp_nready (&self->etp)
219#define xstp_npending(self) etp_npending (&self->etp)
220#define xstp_nthreads(self) etp_nthreads (&self->etp)
221
222#define xstp_max_poll_time(self,seconds) etp_set_max_poll_time (&self->etp, seconds)
223#define xstp_max_poll_reqs(self,maxreqs) etp_set_max_poll_reqs (&self->etp, maxreqs)
224#define xstp_idle_timeout(self,seconds) etp_set_idle_timeout (&self->etp, seconds)
225#define xstp_max_idle(self,threads) etp_set_max_idle (&self->etp, threads)
226#define xstp_min_parallel(self,threads) etp_set_min_parallel (&self->etp, threads)
227#define xstp_max_parallel(self,threads) etp_set_max_parallel (&self->etp, threads)
220 228
221MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_ 229MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_
222 230
223PROTOTYPES: DISABLE 231PROTOTYPES: DISABLE
224 232
242_destroy (xsthreadpool self) 250_destroy (xsthreadpool self)
243 CODE: 251 CODE:
244 //TODO: drain 252 //TODO: drain
245 s_epipe_destroy (&self->ep); 253 s_epipe_destroy (&self->ep);
246 254
247int
248xstp_fileno (xsthreadpool self) 255int xstp_fileno (xsthreadpool self)
256
257unsigned int xstp_nreqs (xsthreadpool self)
258
259unsigned int xstp_nready (xsthreadpool self)
260
261unsigned int xstp_npending (xsthreadpool self)
262
263unsigned int xstp_nthreads (xsthreadpool self)
264
265void xstp_max_poll_time (xsthreadpool self, double seconds)
266
267void xstp_max_poll_reqs (xsthreadpool self, unsigned int maxreqs)
268
269void xstp_idle_timeout (xsthreadpool self, unsigned int seconds)
270
271void xstp_max_idle (xsthreadpool self, unsigned int threads)
272
273void xstp_min_parallel (xsthreadpool self, unsigned int nthreads)
274
275void xstp_max_parallel (xsthreadpool self, unsigned int nthreads)
249 276
250void 277void
251req (xsthreadpool self, SV *type, ...) 278req (xsthreadpool self, SV *type, ...)
252 PROTOTYPE: $$@ 279 PROTOTYPE: $$@
253 PPCODE: 280 PPCODE:
263 croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),"); 290 croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),");
264 291
265 req_alloc (self, handler->req_data_size); 292 req_alloc (self, handler->req_data_size);
266 req = self->freelist; 293 req = self->freelist;
267 294
268 printf ("req %p\n",req);//D
269
270 req->callback = s_get_cv_croak (ST (items - 1)); 295 req->callback = s_get_cv_croak (ST (items - 1));
271 296
272 req->handler = handler; 297 req->handler = handler;
273 /* if this croaks, we haven't lost anything */ 298 /* if this croaks, we haven't lost anything */
274 handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3); 299 handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines