ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-XSThreadPool/XSThreadPool.xs
Revision: 1.1
Committed: Thu Jun 25 21:08:51 2015 UTC (8 years, 11 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #include "EXTERN.h"
2     #include "perl.h"
3     #include "XSUB.h"
4    
5     #include <sys/time.h>
6    
7     #include "schmorp.h"
8    
9     #include "xsthreadpool.h"
10    
11     typedef struct xsthreadpool *xsthreadpool;
12     typedef struct xsthreadpool_request_type *req_type; /* to reduce typing */
13    
14     #define ETP_REQ_MEMBERS \
15     int allocated; \
16     xsthreadpool pool; \
17     req_type handler; \
18     SV *callback;
19    
20     #include "xthread.h"
21     #include "ecb.h"
22    
23     #include "etp.h"
24    
25     static int req_finish (ETP_REQ *req);
26     #define ETP_FINISH(req) req_finish (req)
27     static void req_destroy (ETP_REQ *req);
28     #define ETP_DESTROY(req) req_destroy (req);
29     #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req))
30     #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req))
31    
32     #include "etp.c"
33    
34     /* cache requests blocks up to this size */
35     #define MAX_FREESIZE 10240
36    
37     struct xsthreadpool
38     {
39     struct etp_pool etp;
40     s_epipe ep;
41     ETP_REQ *freelist;
42     int freesize;
43     };
44    
45     static void
46     req_alloc (xsthreadpool self, int size)
47     {
48     ETP_REQ *req = self->freelist;
49    
50     size += sizeof (*req);
51    
52     if (ecb_expect_true (req))
53     {
54     if (ecb_expect_true (req->allocated >= size))
55     return;
56    
57     free (req);
58     }
59    
60     /* we assume that the ETP_REQ is suitably aligned for everything */
61     req = malloc (size);
62    
63     /* do one-time request initialisation here */
64     req->allocated = size;
65     req->pool = self; /* TODO: should somehow be found via worker->et_pool */
66     req->type = ETP_TYPE_START;
67    
68     req->grp_next = self->freelist;
69     self->freelist = req;
70     self->freesize += size;
71     }
72    
73     static void
74     req_pop (xsthreadpool self)
75     {
76     ETP_REQ *req = self->freelist;
77    
78     self->freesize -= req->allocated;
79     self->freelist = req->grp_next;
80     }
81    
82     static void
83     req_put (xsthreadpool self, ETP_REQ *req)
84     {
85     if (ecb_expect_false (self->freesize > MAX_FREESIZE))
86     free (req);
87     else
88     {
89     req->grp_next = self->freelist;
90     self->freelist = req;
91     self->freesize += req->allocated;
92    
93     printf ("freelist now %d\n", self->freesize);//D
94     }
95     }
96    
97     static int
98     req_finish (ETP_REQ *req)
99     {
100     dSP;
101    
102     if (ecb_expect_false (ETP_CANCELLED (req)))
103     return 0;
104    
105     ENTER;
106     SAVETMPS;
107     PUSHMARK (SP);
108    
109     PUTBACK;
110    
111     if (req->handler->xstpreq_finish)
112     {
113     dTHX;
114     req->handler->xstpreq_finish (aTHX_ XSTPREQ (req));
115     }
116    
117     call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD);
118    
119     SPAGAIN;
120    
121     FREETMPS;
122     LEAVE;
123    
124     PUTBACK;
125    
126     return !!SvTRUE (ERRSV);
127     }
128    
129     static void
130     req_destroy (ETP_REQ *req)
131     {
132     if (req->handler->xstpreq_destroy)
133     {
134     dTHX;
135     req->handler->xstpreq_destroy (aTHX_ XSTPREQ (req));
136     }
137    
138     req_put (req->pool, req);
139     }
140    
141     static void
142     xstp_poll (pTHX_ CV *cv)
143     {
144     dXSARGS;
145    
146     xsthreadpool self = (xsthreadpool)S_GENSUB_ARG;
147    
148     printf ("poll\n");
149     if (etp_poll (&self->etp) > 0)
150     croak (0);
151    
152     XSRETURN (0);
153     }
154    
155     static void
156     want_poll (void *userdata)
157     {
158     xsthreadpool self = (xsthreadpool)userdata;
159     s_epipe_signal (&self->ep);
160     }
161    
162     static void
163     done_poll (void *userdata)
164     {
165     xsthreadpool self = (xsthreadpool)userdata;
166     s_epipe_drain (&self->ep);
167     }
168    
169     #define xstp_fileno(self) s_epipe_fd (&self->ep)
170    
171     static void
172     xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems)
173     {
174     if (nitems != 1)
175     croak ("sleep/burn request takes exactly one argument, a fractional number of seconds");
176    
177     *req = SvNV (items [0]);
178    
179     printf ("parse %f\n", *req);
180     }
181    
182     static void
183     xstp_sleep_execute (pTHX_ NV *req)
184     {
185     printf ("exec %f\n", *req);
186     /* select is provided by perl */
187     struct timeval tv = { (int)*req, (*req - (int)*req) * 1e6 };
188     select (0, 0, 0, 0, &tv);
189     }
190    
191     static NV
192     gettod (void)
193     {
194     struct timeval tv;
195     gettimeofday (&tv, 0);
196    
197     return tv.tv_sec + tv.tv_usec * 1e-6;
198     }
199    
200     static void
201     xstp_burn_execute (NV *req)
202     {
203     NV count = 0;
204     NV end = gettod () + *req;
205    
206     while (gettod () < end)
207     ++count;
208    
209     *req = count;
210     }
211    
212     static void
213     xstp_burn_finish (pTHX_ NV *req)
214     {
215     dSP;
216     printf ("push %f\n", *req);//D
217     XPUSHs (sv_2mortal (newSVnv (*req)));
218     PUTBACK;
219     }
220    
221     MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_
222    
223     PROTOTYPES: DISABLE
224    
225     BOOT:
226     XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "sleep", NV, xstp_sleep_prepare, xstp_sleep_execute, 0, 0);
227     XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "burn", NV, xstp_sleep_prepare, xstp_burn_execute, xstp_burn_finish, 0);
228    
229     SV *
230     _init (SV *sv)
231     PPCODE:
232     {
233     xsthreadpool self = (xsthreadpool)SvGROW (sv, sizeof (*self));
234     memset (self, 0, sizeof (*self));
235     if (s_epipe_new (&self->ep))
236     croak ("AnyEvent::XSThreadPool::new: unable to initialise event pipe,");
237     etp_init (&self->etp, self, want_poll, done_poll);
238     XPUSHs (sv_2mortal (s_gensub (xstp_poll, self)));
239     }
240    
241     void
242     _destroy (xsthreadpool self)
243     CODE:
244     //TODO: drain
245     s_epipe_destroy (&self->ep);
246    
247     int
248     xstp_fileno (xsthreadpool self)
249    
250     void
251     req (xsthreadpool self, SV *type, ...)
252     PROTOTYPE: $$@
253     PPCODE:
254     {
255     ETP_REQ *req;
256     req_type handler = (req_type)SvPVX (type);
257     if (SvCUR (type) != sizeof (*handler)
258     || handler->magic1 != XSTHREADPOOL_MAGIC1
259     || handler->magic2 != XSTHREADPOOL_MAGIC2)
260     croak ("AnyEvent::XSThreadPool::req: passed request type invalid, corrupted, or wrong version,");
261    
262     if (items < 3)
263     croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),");
264    
265     req_alloc (self, handler->req_data_size);
266     req = self->freelist;
267    
268     printf ("req %p\n",req);//D
269    
270     req->callback = s_get_cv_croak (ST (items - 1));
271    
272     req->handler = handler;
273     /* if this croaks, we haven't lost anything */
274     handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3);
275    
276     /* after this, we can't croak without cleaning up */
277     req_pop (self);
278    
279     SvREFCNT_inc_NN (req->callback);
280    
281     etp_submit (&self->etp, req);
282     }
283