ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-XSThreadPool/XSThreadPool.xs
Revision: 1.2
Committed: Thu Jun 25 21:24:18 2015 UTC (8 years, 10 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +63 -38 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 #include <sys/time.h>
6
7 #include "schmorp.h"
8
9 #include "xsthreadpool.h"
10
11 typedef struct xsthreadpool *xsthreadpool;
12 typedef struct xsthreadpool_request_type *req_type; /* to reduce typing */
13
14 #define ETP_REQ_MEMBERS \
15 int allocated; \
16 xsthreadpool pool; \
17 req_type handler; \
18 SV *callback;
19
20 #include "xthread.h"
21 #include "ecb.h"
22
23 #include "etp.h"
24
25 static int req_finish (ETP_REQ *req);
26 #define ETP_FINISH(req) req_finish (req)
27 #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req))
28 #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req))
29
30 #include "etp.c"
31
32 /* cache requests blocks up to this size */
33 #define MAX_FREESIZE 10240
34
35 struct xsthreadpool
36 {
37 struct etp_pool etp;
38 s_epipe ep;
39 ETP_REQ *freelist;
40 int freesize;
41 };
42
43 static void
44 req_alloc (xsthreadpool self, int size)
45 {
46 ETP_REQ *req = self->freelist;
47
48 size += sizeof (*req);
49
50 if (ecb_expect_true (req))
51 {
52 if (ecb_expect_true (req->allocated >= size))
53 return;
54
55 free (req);
56 }
57
58 /* we assume that the ETP_REQ is suitably aligned for everything */
59 req = malloc (size);
60
61 /* do one-time request initialisation here */
62 req->allocated = size;
63 req->pool = self; /* TODO: should somehow be found via worker->et_pool */
64 req->type = ETP_TYPE_START;
65
66 req->grp_next = self->freelist;
67 self->freelist = req;
68 self->freesize += size;
69 }
70
71 static void
72 req_pop (xsthreadpool self)
73 {
74 ETP_REQ *req = self->freelist;
75
76 self->freesize -= req->allocated;
77 self->freelist = req->grp_next;
78 }
79
80 static void
81 req_put (xsthreadpool self, ETP_REQ *req)
82 {
83 if (ecb_expect_false (self->freesize > MAX_FREESIZE))
84 free (req);
85 else
86 {
87 req->grp_next = self->freelist;
88 self->freelist = req;
89 self->freesize += req->allocated;
90 }
91 }
92
93 static void
94 req_destroy (ETP_REQ *req)
95 {
96 if (req->handler->xstpreq_destroy)
97 {
98 dTHX;
99 req->handler->xstpreq_destroy (aTHX_ XSTPREQ (req));
100 }
101
102 req_put (req->pool, req);
103 }
104
105 static int
106 req_finish (ETP_REQ *req)
107 {
108 dSP;
109
110 if (ecb_expect_true (!ETP_CANCELLED (req)))
111 {
112 ENTER;
113 SAVETMPS;
114 PUSHMARK (SP);
115
116 PUTBACK;
117
118 if (req->handler->xstpreq_finish)
119 {
120 dTHX;
121 req->handler->xstpreq_finish (aTHX_ XSTPREQ (req));
122 }
123
124 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
125
126 SPAGAIN;
127
128 FREETMPS;
129 LEAVE;
130
131 PUTBACK;
132 }
133
134 req_destroy (req);
135
136 return !!SvTRUE (ERRSV);
137 }
138
139 static void
140 xstp_poll (pTHX_ CV *cv)
141 {
142 dXSARGS;
143
144 xsthreadpool self = (xsthreadpool)S_GENSUB_ARG;
145
146 printf ("poll\n");
147 if (etp_poll (&self->etp) > 0)
148 croak (0);
149
150 XSRETURN (0);
151 }
152
153 static void
154 want_poll (void *userdata)
155 {
156 xsthreadpool self = (xsthreadpool)userdata;
157 s_epipe_signal (&self->ep);
158 }
159
160 static void
161 done_poll (void *userdata)
162 {
163 xsthreadpool self = (xsthreadpool)userdata;
164 s_epipe_drain (&self->ep);
165 }
166
167 static void
168 xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems)
169 {
170 if (nitems != 1)
171 croak ("sleep/burn request takes exactly one argument, a fractional number of seconds");
172
173 *req = SvNV (items [0]);
174
175 printf ("parse %f\n", *req);
176 }
177
178 static void
179 xstp_sleep_execute (pTHX_ NV *req)
180 {
181 printf ("exec %f\n", *req);
182 /* select is provided by perl */
183 struct timeval tv = { (int)*req, (*req - (int)*req) * 1e6 };
184 select (0, 0, 0, 0, &tv);
185 }
186
187 static NV
188 gettod (void)
189 {
190 struct timeval tv;
191 gettimeofday (&tv, 0);
192
193 return tv.tv_sec + tv.tv_usec * 1e-6;
194 }
195
196 static void
197 xstp_burn_execute (NV *req)
198 {
199 NV count = 0;
200 NV end = gettod () + *req;
201
202 while (gettod () < end)
203 ++count;
204
205 *req = count;
206 }
207
208 static void
209 xstp_burn_finish (pTHX_ NV *req)
210 {
211 dSP;
212 XPUSHs (sv_2mortal (newSVnv (*req)));
213 PUTBACK;
214 }
215
216 #define xstp_fileno(self) s_epipe_fd (&self->ep)
217 #define xstp_nreqs(self) etp_nreqs (&self->etp)
218 #define xstp_nready(self) etp_nready (&self->etp)
219 #define xstp_npending(self) etp_npending (&self->etp)
220 #define xstp_nthreads(self) etp_nthreads (&self->etp)
221
222 #define xstp_max_poll_time(self,seconds) etp_set_max_poll_time (&self->etp, seconds)
223 #define xstp_max_poll_reqs(self,maxreqs) etp_set_max_poll_reqs (&self->etp, maxreqs)
224 #define xstp_idle_timeout(self,seconds) etp_set_idle_timeout (&self->etp, seconds)
225 #define xstp_max_idle(self,threads) etp_set_max_idle (&self->etp, threads)
226 #define xstp_min_parallel(self,threads) etp_set_min_parallel (&self->etp, threads)
227 #define xstp_max_parallel(self,threads) etp_set_max_parallel (&self->etp, threads)
228
229 MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_
230
231 PROTOTYPES: DISABLE
232
233 BOOT:
234 XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "sleep", NV, xstp_sleep_prepare, xstp_sleep_execute, 0, 0);
235 XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "burn", NV, xstp_sleep_prepare, xstp_burn_execute, xstp_burn_finish, 0);
236
237 SV *
238 _init (SV *sv)
239 PPCODE:
240 {
241 xsthreadpool self = (xsthreadpool)SvGROW (sv, sizeof (*self));
242 memset (self, 0, sizeof (*self));
243 if (s_epipe_new (&self->ep))
244 croak ("AnyEvent::XSThreadPool::new: unable to initialise event pipe,");
245 etp_init (&self->etp, self, want_poll, done_poll);
246 XPUSHs (sv_2mortal (s_gensub (xstp_poll, self)));
247 }
248
249 void
250 _destroy (xsthreadpool self)
251 CODE:
252 //TODO: drain
253 s_epipe_destroy (&self->ep);
254
255 int xstp_fileno (xsthreadpool self)
256
257 unsigned int xstp_nreqs (xsthreadpool self)
258
259 unsigned int xstp_nready (xsthreadpool self)
260
261 unsigned int xstp_npending (xsthreadpool self)
262
263 unsigned int xstp_nthreads (xsthreadpool self)
264
265 void xstp_max_poll_time (xsthreadpool self, double seconds)
266
267 void xstp_max_poll_reqs (xsthreadpool self, unsigned int maxreqs)
268
269 void xstp_idle_timeout (xsthreadpool self, unsigned int seconds)
270
271 void xstp_max_idle (xsthreadpool self, unsigned int threads)
272
273 void xstp_min_parallel (xsthreadpool self, unsigned int nthreads)
274
275 void xstp_max_parallel (xsthreadpool self, unsigned int nthreads)
276
277 void
278 req (xsthreadpool self, SV *type, ...)
279 PROTOTYPE: $$@
280 PPCODE:
281 {
282 ETP_REQ *req;
283 req_type handler = (req_type)SvPVX (type);
284 if (SvCUR (type) != sizeof (*handler)
285 || handler->magic1 != XSTHREADPOOL_MAGIC1
286 || handler->magic2 != XSTHREADPOOL_MAGIC2)
287 croak ("AnyEvent::XSThreadPool::req: passed request type invalid, corrupted, or wrong version,");
288
289 if (items < 3)
290 croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),");
291
292 req_alloc (self, handler->req_data_size);
293 req = self->freelist;
294
295 req->callback = s_get_cv_croak (ST (items - 1));
296
297 req->handler = handler;
298 /* if this croaks, we haven't lost anything */
299 handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3);
300
301 /* after this, we can't croak without cleaning up */
302 req_pop (self);
303
304 SvREFCNT_inc_NN (req->callback);
305
306 etp_submit (&self->etp, req);
307 }
308