#include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include "schmorp.h" #include "xsthreadpool.h" typedef struct xsthreadpool *xsthreadpool; typedef struct xsthreadpool_request_type *req_type; /* to reduce typing */ #define ETP_REQ_MEMBERS \ int allocated; \ xsthreadpool pool; \ req_type handler; \ SV *callback; #include "xthread.h" #include "ecb.h" #include "etp.h" static int req_finish (ETP_REQ *req); #define ETP_FINISH(req) req_finish (req) static void req_destroy (ETP_REQ *req); #define ETP_DESTROY(req) req_destroy (req); #define ETP_EXECUTE(wrk,req) req->handler->xstpreq_execute (XSTPREQ (req)) #define XSTPREQ(req) (void *)(1 + (ETP_REQ *)(req)) #include "etp.c" /* cache requests blocks up to this size */ #define MAX_FREESIZE 10240 struct xsthreadpool { struct etp_pool etp; s_epipe ep; ETP_REQ *freelist; int freesize; }; static void req_alloc (xsthreadpool self, int size) { ETP_REQ *req = self->freelist; size += sizeof (*req); if (ecb_expect_true (req)) { if (ecb_expect_true (req->allocated >= size)) return; free (req); } /* we assume that the ETP_REQ is suitably aligned for everything */ req = malloc (size); /* do one-time request initialisation here */ req->allocated = size; req->pool = self; /* TODO: should somehow be found via worker->et_pool */ req->type = ETP_TYPE_START; req->grp_next = self->freelist; self->freelist = req; self->freesize += size; } static void req_pop (xsthreadpool self) { ETP_REQ *req = self->freelist; self->freesize -= req->allocated; self->freelist = req->grp_next; } static void req_put (xsthreadpool self, ETP_REQ *req) { if (ecb_expect_false (self->freesize > MAX_FREESIZE)) free (req); else { req->grp_next = self->freelist; self->freelist = req; self->freesize += req->allocated; printf ("freelist now %d\n", self->freesize);//D } } static int req_finish (ETP_REQ *req) { dSP; if (ecb_expect_false (ETP_CANCELLED (req))) return 0; ENTER; SAVETMPS; PUSHMARK (SP); PUTBACK; if (req->handler->xstpreq_finish) { dTHX; req->handler->xstpreq_finish (aTHX_ XSTPREQ (req)); } call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD); SPAGAIN; FREETMPS; LEAVE; PUTBACK; return !!SvTRUE (ERRSV); } static void req_destroy (ETP_REQ *req) { if (req->handler->xstpreq_destroy) { dTHX; req->handler->xstpreq_destroy (aTHX_ XSTPREQ (req)); } req_put (req->pool, req); } static void xstp_poll (pTHX_ CV *cv) { dXSARGS; xsthreadpool self = (xsthreadpool)S_GENSUB_ARG; printf ("poll\n"); if (etp_poll (&self->etp) > 0) croak (0); XSRETURN (0); } static void want_poll (void *userdata) { xsthreadpool self = (xsthreadpool)userdata; s_epipe_signal (&self->ep); } static void done_poll (void *userdata) { xsthreadpool self = (xsthreadpool)userdata; s_epipe_drain (&self->ep); } #define xstp_fileno(self) s_epipe_fd (&self->ep) static void xstp_sleep_prepare (pTHX_ NV *req, SV **items, int nitems) { if (nitems != 1) croak ("sleep/burn request takes exactly one argument, a fractional number of seconds"); *req = SvNV (items [0]); printf ("parse %f\n", *req); } static void xstp_sleep_execute (pTHX_ NV *req) { printf ("exec %f\n", *req); /* select is provided by perl */ struct timeval tv = { (int)*req, (*req - (int)*req) * 1e6 }; select (0, 0, 0, 0, &tv); } static NV gettod (void) { struct timeval tv; gettimeofday (&tv, 0); return tv.tv_sec + tv.tv_usec * 1e-6; } static void xstp_burn_execute (NV *req) { NV count = 0; NV end = gettod () + *req; while (gettod () < end) ++count; *req = count; } static void xstp_burn_finish (pTHX_ NV *req) { dSP; printf ("push %f\n", *req);//D XPUSHs (sv_2mortal (newSVnv (*req))); PUTBACK; } MODULE = AnyEvent::XSThreadPool PACKAGE = AnyEvent::XSThreadPool PREFIX = xstp_ PROTOTYPES: DISABLE BOOT: XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "sleep", NV, xstp_sleep_prepare, xstp_sleep_execute, 0, 0); XSTHREADPOOL_REQUEST_TYPE (CvSTASH (cv), "burn", NV, xstp_sleep_prepare, xstp_burn_execute, xstp_burn_finish, 0); SV * _init (SV *sv) PPCODE: { xsthreadpool self = (xsthreadpool)SvGROW (sv, sizeof (*self)); memset (self, 0, sizeof (*self)); if (s_epipe_new (&self->ep)) croak ("AnyEvent::XSThreadPool::new: unable to initialise event pipe,"); etp_init (&self->etp, self, want_poll, done_poll); XPUSHs (sv_2mortal (s_gensub (xstp_poll, self))); } void _destroy (xsthreadpool self) CODE: //TODO: drain s_epipe_destroy (&self->ep); int xstp_fileno (xsthreadpool self) void req (xsthreadpool self, SV *type, ...) PROTOTYPE: $$@ PPCODE: { ETP_REQ *req; req_type handler = (req_type)SvPVX (type); if (SvCUR (type) != sizeof (*handler) || handler->magic1 != XSTHREADPOOL_MAGIC1 || handler->magic2 != XSTHREADPOOL_MAGIC2) croak ("AnyEvent::XSThreadPool::req: passed request type invalid, corrupted, or wrong version,"); if (items < 3) croak ("AnyEvent::XSThreadPool::req: must have at least three arguments ($threadpool, $request_type, ..., $callback),"); req_alloc (self, handler->req_data_size); req = self->freelist; printf ("req %p\n",req);//D req->callback = s_get_cv_croak (ST (items - 1)); req->handler = handler; /* if this croaks, we haven't lost anything */ handler->xstpreq_prepare (aTHX_ XSTPREQ (req), &ST(2), items - 3); /* after this, we can't croak without cleaning up */ req_pop (self); SvREFCNT_inc_NN (req->callback); etp_submit (&self->etp, req); }