ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-XSThreadPool/XSThreadPool.xs
Revision: 1.2
Committed: Thu Jun 25 21:24:18 2015 UTC (9 years ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +63 -38 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/time.h>
6    
7     #include "schmorp.h"
8    
9     #include "xsthreadpool.h"
10    
11     typedef struct xsthreadpool *xsthreadpool;
12     typedef struct xsthreadpool_request_type *req_type; /* to reduce typing */
13    
14     #define ETP_REQ_MEMBERS \
15     int allocated; \
16     xsthreadpool pool; \
17     req_type handler; \
18     SV *callback;
19    
20     #include "xthread.h"
21     #include "ecb.h"
22    
23     #include "etp.h"
24    
25     static int req_finish (ETP_REQ *req);
26     #define ETP_FINISH(req) req_finish (req)
27     #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req))
28     #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req))
29    
30     #include "etp.c"
31    
32     /* cache requests blocks up to this size */
33     #define MAX_FREESIZE 10240
34    
35     struct xsthreadpool
36     {
37     struct etp_pool etp;
38     s_epipe ep;
39     ETP_REQ *freelist;
40     int freesize;
41     };
42    
43     static void
44     req_alloc (xsthreadpool self, int size)
45     {
46     ETP_REQ *req = self->freelist;
47    
48     size += sizeof (*req);
49    
50     if (ecb_expect_true (req))
51     {
52     if (ecb_expect_true (req->allocated >= size))
53     return;
54    
55     free (req);
56     }
57    
58     /* we assume that the ETP_REQ is suitably aligned for everything */
59     req = malloc (size);
60    
61     /* do one-time request initialisation here */
62     req->allocated = size;
63     req->pool = self; /* TODO: should somehow be found via worker->et_pool */
64     req->type = ETP_TYPE_START;
65    
66     req->grp_next = self->freelist;
67     self->freelist = req;
68     self->freesize += size;
69     }
70    
71     static void
72     req_pop (xsthreadpool self)
73     {
74     ETP_REQ *req = self->freelist;
75    
76     self->freesize -= req->allocated;
77     self->freelist = req->grp_next;
78     }
79    
80     static void
81     req_put (xsthreadpool self, ETP_REQ *req)
82     {
83     if (ecb_expect_false (self->freesize > MAX_FREESIZE))
84     free (req);
85     else
86     {
87     req->grp_next = self->freelist;
88     self->freelist = req;
89     self->freesize += req->allocated;
90 root 1.2 }
91     }
92 root 1.1
93 root 1.2 static void
94     req_destroy (ETP_REQ *req)
95     {
96     if (req->handler->xstpreq_destroy)
97     {
98     dTHX;
99     req->handler->xstpreq_destroy (aTHX_ XSTPREQ (req));
100 root 1.1 }
101 root 1.2
102     req_put (req->pool, req);
103 root 1.1 }
104    
105     static int
106     req_finish (ETP_REQ *req)
107     {
108     dSP;
109    
110 root 1.2 if (ecb_expect_true (!ETP_CANCELLED (req)))
111     {
112     ENTER;
113     SAVETMPS;
114     PUSHMARK (SP);
115 root 1.1
116 root 1.2 PUTBACK;
117 root 1.1
118 root 1.2 if (req->handler->xstpreq_finish)
119     {
120     dTHX;
121     req->handler->xstpreq_finish (aTHX_ XSTPREQ (req));
122     }
123 root 1.1
124 root 1.2 call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
125 root 1.1
126 root 1.2 SPAGAIN;
127 root 1.1
128 root 1.2 FREETMPS;
129     LEAVE;
130 root 1.1
131 root 1.2 PUTBACK;
132     }
133 root 1.1
134 root 1.2 req_destroy (req);
135 root 1.1
136     return !!SvTRUE (ERRSV);
137     }
138    
139     static void
140     xstp_poll (pTHX_ CV *cv)
141     {
142     dXSARGS;
143    
144     xsthreadpool self = (xsthreadpool)S_GENSUB_ARG;
145    
146     printf ("poll\n");
147     if (etp_poll (&self->etp) > 0)
148     croak (0);
149    
150     XSRETURN (0);
151     }
152    
153     static void
154     want_poll (void *userdata)
155     {
156     xsthreadpool self = (xsthreadpool)userdata;
157     s_epipe_signal (&self->ep);
158     }
159    
160     static void
161     done_poll (void *userdata)
162     {
163     xsthreadpool self = (xsthreadpool)userdata;
164     s_epipe_drain (&self->ep);
165     }
166    
167     static void
168     xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems)
169     {
170     if (nitems != 1)
171     croak ("sleep/burn request takes exactly one argument, a fractional number of seconds");
172    
173     *req = SvNV (items [0]);
174    
175     printf ("parse %f\n", *req);
176     }
177    
178     static void
179     xstp_sleep_execute (pTHX_ NV *req)
180     {
181     printf ("exec %f\n", *req);
182     /* select is provided by perl */
183     struct timeval tv = { (int)*req, (*req - (int)*req) * 1e6 };
184     select (0, 0, 0, 0, &tv);
185     }
186    
187     static NV
188     gettod (void)
189     {
190     struct timeval tv;
191     gettimeofday (&tv, 0);
192    
193     return tv.tv_sec + tv.tv_usec * 1e-6;
194     }
195    
196     static void
197     xstp_burn_execute (NV *req)
198     {
199     NV count = 0;
200     NV end = gettod () + *req;
201    
202     while (gettod () < end)
203     ++count;
204    
205     *req = count;
206     }
207    
208     static void
209     xstp_burn_finish (pTHX_ NV *req)
210     {
211     dSP;
212     XPUSHs (sv_2mortal (newSVnv (*req)));
213     PUTBACK;
214     }
215    
216 root 1.2 #define xstp_fileno(self) s_epipe_fd (&self->ep)
217     #define xstp_nreqs(self) etp_nreqs (&self->etp)
218     #define xstp_nready(self) etp_nready (&self->etp)
219     #define xstp_npending(self) etp_npending (&self->etp)
220     #define xstp_nthreads(self) etp_nthreads (&self->etp)
221    
222     #define xstp_max_poll_time(self,seconds) etp_set_max_poll_time (&self->etp, seconds)
223     #define xstp_max_poll_reqs(self,maxreqs) etp_set_max_poll_reqs (&self->etp, maxreqs)
224     #define xstp_idle_timeout(self,seconds) etp_set_idle_timeout (&self->etp, seconds)
225     #define xstp_max_idle(self,threads) etp_set_max_idle (&self->etp, threads)
226     #define xstp_min_parallel(self,threads) etp_set_min_parallel (&self->etp, threads)
227     #define xstp_max_parallel(self,threads) etp_set_max_parallel (&self->etp, threads)
228    
229 root 1.1 MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_
230    
231     PROTOTYPES: DISABLE
232    
233     BOOT:
234     XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "sleep", NV, xstp_sleep_prepare, xstp_sleep_execute, 0, 0);
235     XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "burn", NV, xstp_sleep_prepare, xstp_burn_execute, xstp_burn_finish, 0);
236    
237     SV *
238     _init (SV *sv)
239     PPCODE:
240     {
241     xsthreadpool self = (xsthreadpool)SvGROW (sv, sizeof (*self));
242     memset (self, 0, sizeof (*self));
243     if (s_epipe_new (&self->ep))
244     croak ("AnyEvent::XSThreadPool::new: unable to initialise event pipe,");
245     etp_init (&self->etp, self, want_poll, done_poll);
246     XPUSHs (sv_2mortal (s_gensub (xstp_poll, self)));
247     }
248    
249     void
250     _destroy (xsthreadpool self)
251     CODE:
252     //TODO: drain
253     s_epipe_destroy (&self->ep);
254    
255 root 1.2 int xstp_fileno (xsthreadpool self)
256    
257     unsigned int xstp_nreqs (xsthreadpool self)
258    
259     unsigned int xstp_nready (xsthreadpool self)
260    
261     unsigned int xstp_npending (xsthreadpool self)
262    
263     unsigned int xstp_nthreads (xsthreadpool self)
264    
265     void xstp_max_poll_time (xsthreadpool self, double seconds)
266    
267     void xstp_max_poll_reqs (xsthreadpool self, unsigned int maxreqs)
268    
269     void xstp_idle_timeout (xsthreadpool self, unsigned int seconds)
270    
271     void xstp_max_idle (xsthreadpool self, unsigned int threads)
272    
273     void xstp_min_parallel (xsthreadpool self, unsigned int nthreads)
274    
275     void xstp_max_parallel (xsthreadpool self, unsigned int nthreads)
276 root 1.1
277     void
278     req (xsthreadpool self, SV *type, ...)
279     PROTOTYPE: $$@
280     PPCODE:
281     {
282     ETP_REQ *req;
283     req_type handler = (req_type)SvPVX (type);
284     if (SvCUR (type) != sizeof (*handler)
285     || handler->magic1 != XSTHREADPOOL_MAGIC1
286     || handler->magic2 != XSTHREADPOOL_MAGIC2)
287     croak ("AnyEvent::XSThreadPool::req: passed request type invalid, corrupted, or wrong version,");
288    
289     if (items < 3)
290     croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),");
291    
292     req_alloc (self, handler->req_data_size);
293     req = self->freelist;
294    
295     req->callback = s_get_cv_croak (ST (items - 1));
296    
297     req->handler = handler;
298     /* if this croaks, we haven't lost anything */
299     handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3);
300    
301     /* after this, we can't croak without cleaning up */
302     req_pop (self);
303    
304     SvREFCNT_inc_NN (req->callback);
305    
306     etp_submit (&self->etp, req);
307     }
308