--- libeio/etp.c 2015/06/25 17:05:07 1.4 +++ libeio/etp.c 2018/08/14 09:29:50 1.12 @@ -1,7 +1,7 @@ /* * libetp implementation * - * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann + * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -54,6 +54,13 @@ # define ETP_TYPE_GROUP 1 #endif +#ifndef ETP_WANT_POLL +# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata) +#endif +#ifndef ETP_DONE_POLL +# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata) +#endif + #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) #define ETP_TICKS ((1000000 + 1023) >> 10) @@ -71,25 +78,6 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } -static unsigned int started, idle, wanted = 4; - -static void (*want_poll_cb) (void); -static void (*done_poll_cb) (void); - -static unsigned int max_poll_time; /* reslock */ -static unsigned int max_poll_reqs; /* reslock */ - -static unsigned int nreqs; /* reqlock */ -static unsigned int nready; /* reqlock */ -static unsigned int npending; /* reqlock */ -static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */ -static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */ - -static xmutex_t wrklock; -static xmutex_t reslock; -static xmutex_t reqlock; -static xcond_t reqwait; - struct etp_tmpbuf { void *ptr; @@ -108,11 +96,26 @@ return buf->ptr; } +/* + * a somewhat faster data structure might be nice, but + * with 8 priorities this actually needs <20 insns + * per shift, the most expensive operation. + */ +typedef struct +{ + ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ + int size; +} etp_reqq; + +typedef struct etp_pool *etp_pool; + typedef struct etp_worker { + etp_pool pool; + struct etp_tmpbuf tmpbuf; - /* locked by wrklock */ + /* locked by pool->wrklock */ struct etp_worker *prev, *next; xthread_t tid; @@ -122,10 +125,37 @@ #endif } etp_worker; -static etp_worker wrk_first; /* NOT etp */ +struct etp_pool +{ + void *userdata; + + etp_reqq req_queue; + etp_reqq res_queue; -#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) -#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) + unsigned int started, idle, wanted; + + unsigned int max_poll_time; /* pool->reslock */ + unsigned int max_poll_reqs; /* pool->reslock */ + + unsigned int nreqs; /* pool->reqlock */ + unsigned int nready; /* pool->reqlock */ + unsigned int npending; /* pool->reqlock */ + unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */ + unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */ + + void (*want_poll_cb) (void *userdata); + void (*done_poll_cb) (void *userdata); + + xmutex_t wrklock; + xmutex_t reslock; + xmutex_t reqlock; + xcond_t reqwait; + + etp_worker wrk_first; +}; + +#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) +#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) /* worker threads management */ @@ -146,64 +176,51 @@ } ETP_API_DECL unsigned int -etp_nreqs (void) +etp_nreqs (etp_pool pool) { int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_nready (void) +etp_nready (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nready; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nready; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_npending (void) +etp_npending (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = npending; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->npending; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_nthreads (void) +etp_nthreads (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = started; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->started; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } -/* - * a somewhat faster data structure might be nice, but - * with 8 priorities this actually needs <20 insns - * per shift, the most expensive operation. - */ -typedef struct { - ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ - int size; -} etp_reqq; - -static etp_reqq req_queue; -static etp_reqq res_queue; - static void ecb_noinline ecb_cold reqq_init (etp_reqq *q) { @@ -259,27 +276,32 @@ } ETP_API_DECL int ecb_cold -etp_init (void (*want_poll)(void), void (*done_poll)(void)) +etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) { - X_MUTEX_CREATE (wrklock); - X_MUTEX_CREATE (reslock); - X_MUTEX_CREATE (reqlock); - X_COND_CREATE (reqwait); - - reqq_init (&req_queue); - reqq_init (&res_queue); - - wrk_first.next = - wrk_first.prev = &wrk_first; - - started = 0; - idle = 0; - nreqs = 0; - nready = 0; - npending = 0; - - want_poll_cb = want_poll; - done_poll_cb = done_poll; + X_MUTEX_CREATE (pool->wrklock); + X_MUTEX_CREATE (pool->reslock); + X_MUTEX_CREATE (pool->reqlock); + X_COND_CREATE (pool->reqwait); + + reqq_init (&pool->req_queue); + reqq_init (&pool->res_queue); + + pool->wrk_first.next = + pool->wrk_first.prev = &pool->wrk_first; + + pool->started = 0; + pool->idle = 0; + pool->nreqs = 0; + pool->nready = 0; + pool->npending = 0; + pool->wanted = 4; + + pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ + pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ + + pool->userdata = userdata; + pool->want_poll_cb = want_poll; + pool->done_poll_cb = done_poll; return 0; } @@ -306,6 +328,7 @@ ETP_REQ *req; struct timespec ts; etp_worker *self = (etp_worker *)thr_arg; + etp_pool pool = self->pool; etp_proc_init (); @@ -316,139 +339,141 @@ { ts.tv_sec = 0; - X_LOCK (reqlock); + X_LOCK (pool->reqlock); for (;;) { - req = reqq_shift (&req_queue); + req = reqq_shift (&pool->req_queue); - if (req) + if (ecb_expect_true (req)) break; if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ { - X_UNLOCK (reqlock); - X_LOCK (wrklock); - --started; - X_UNLOCK (wrklock); + X_UNLOCK (pool->reqlock); + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); goto quit; } - ++idle; + ++pool->idle; - if (idle <= max_idle) - /* we are allowed to idle, so do so without any timeout */ - X_COND_WAIT (reqwait, reqlock); + if (pool->idle <= pool->max_idle) + /* we are allowed to pool->idle, so do so without any timeout */ + X_COND_WAIT (pool->reqwait, pool->reqlock); else { /* initialise timeout once */ if (!ts.tv_sec) - ts.tv_sec = time (0) + idle_timeout; + ts.tv_sec = time (0) + pool->idle_timeout; - if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) + if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT) ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ } - --idle; + --pool->idle; } - --nready; + --pool->nready; - X_UNLOCK (reqlock); + X_UNLOCK (pool->reqlock); - if (req->type == ETP_TYPE_QUIT) + if (ecb_expect_false (req->type == ETP_TYPE_QUIT)) goto quit; ETP_EXECUTE (self, req); - X_LOCK (reslock); + X_LOCK (pool->reslock); - ++npending; + ++pool->npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); etp_worker_clear (self); - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); } quit: free (req); - X_LOCK (wrklock); + X_LOCK (pool->wrklock); etp_worker_free (self); - X_UNLOCK (wrklock); + X_UNLOCK (pool->wrklock); return 0; } static void ecb_cold -etp_start_thread (void) +etp_start_thread (etp_pool pool) { etp_worker *wrk = calloc (1, sizeof (etp_worker)); /*TODO*/ assert (("unable to allocate worker thread data", wrk)); - X_LOCK (wrklock); + wrk->pool = pool; + + X_LOCK (pool->wrklock); if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) { - wrk->prev = &wrk_first; - wrk->next = wrk_first.next; - wrk_first.next->prev = wrk; - wrk_first.next = wrk; - ++started; + wrk->prev = &pool->wrk_first; + wrk->next = pool->wrk_first.next; + pool->wrk_first.next->prev = wrk; + pool->wrk_first.next = wrk; + ++pool->started; } else free (wrk); - X_UNLOCK (wrklock); + X_UNLOCK (pool->wrklock); } static void -etp_maybe_start_thread (void) +etp_maybe_start_thread (etp_pool pool) { - if (ecb_expect_true (etp_nthreads () >= wanted)) + if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted)) return; - /* todo: maybe use idle here, but might be less exact */ - if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) + /* todo: maybe use pool->idle here, but might be less exact */ + if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; - etp_start_thread (); + etp_start_thread (pool); } static void ecb_cold -etp_end_thread (void) +etp_end_thread (etp_pool pool) { ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ req->type = ETP_TYPE_QUIT; req->pri = ETP_PRI_MAX - ETP_PRI_MIN; - X_LOCK (reqlock); - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); - - X_LOCK (wrklock); - --started; - X_UNLOCK (wrklock); + X_LOCK (pool->reqlock); + reqq_push (&pool->req_queue, req); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); + + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); } ETP_API_DECL int -etp_poll (void) +etp_poll (etp_pool pool) { unsigned int maxreqs; unsigned int maxtime; struct timeval tv_start, tv_now; - X_LOCK (reslock); - maxreqs = max_poll_reqs; - maxtime = max_poll_time; - X_UNLOCK (reslock); + X_LOCK (pool->reslock); + maxreqs = pool->max_poll_reqs; + maxtime = pool->max_poll_time; + X_UNLOCK (pool->reslock); if (maxtime) gettimeofday (&tv_start, 0); @@ -457,27 +482,27 @@ { ETP_REQ *req; - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); - X_LOCK (reslock); - req = reqq_shift (&res_queue); + X_LOCK (pool->reslock); + req = reqq_shift (&pool->res_queue); - if (req) + if (ecb_expect_true (req)) { - --npending; + --pool->npending; - if (!res_queue.size && done_poll_cb) - done_poll_cb (); + if (!pool->res_queue.size) + ETP_DONE_POLL (pool); } - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); - if (!req) + if (ecb_expect_false (!req)) return 0; - X_LOCK (reqlock); - --nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + --pool->nreqs; + X_UNLOCK (pool->reqlock); if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { @@ -508,25 +533,25 @@ } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp); +etp_grp_cancel (etp_pool pool, ETP_REQ *grp); ETP_API_DECL void -etp_cancel (ETP_REQ *req) +etp_cancel (etp_pool pool, ETP_REQ *req) { req->cancelled = 1; - etp_grp_cancel (req); + etp_grp_cancel (pool, req); } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp) +etp_grp_cancel (etp_pool pool, ETP_REQ *grp) { for (grp = grp->grp_first; grp; grp = grp->grp_next) - etp_cancel (grp); + etp_cancel (pool, grp); } ETP_API_DECL void -etp_submit (ETP_REQ *req) +etp_submit (etp_pool pool, ETP_REQ *req) { req->pri -= ETP_PRI_MIN; @@ -536,78 +561,78 @@ if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) { /* I hope this is worth it :/ */ - X_LOCK (reqlock); - ++nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + ++pool->nreqs; + X_UNLOCK (pool->reqlock); - X_LOCK (reslock); + X_LOCK (pool->reslock); - ++npending; + ++pool->npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); } else { - X_LOCK (reqlock); - ++nreqs; - ++nready; - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + ++pool->nreqs; + ++pool->nready; + reqq_push (&pool->req_queue, req); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); } } ETP_API_DECL void ecb_cold -etp_set_max_poll_time (double nseconds) +etp_set_max_poll_time (etp_pool pool, double seconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_time = nseconds * ETP_TICKS; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_time = seconds * ETP_TICKS; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold -etp_set_max_poll_reqs (unsigned int maxreqs) +etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_reqs = maxreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_reqs = maxreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold -etp_set_max_idle (unsigned int nthreads) +etp_set_max_idle (etp_pool pool, unsigned int threads) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - max_idle = nthreads; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->max_idle = threads; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold -etp_set_idle_timeout (unsigned int seconds) +etp_set_idle_timeout (etp_pool pool, unsigned int seconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - idle_timeout = seconds; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->idle_timeout = seconds; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold -etp_set_min_parallel (unsigned int nthreads) +etp_set_min_parallel (etp_pool pool, unsigned int threads) { - if (wanted < nthreads) - wanted = nthreads; + if (pool->wanted < threads) + pool->wanted = threads; } ETP_API_DECL void ecb_cold -etp_set_max_parallel (unsigned int nthreads) +etp_set_max_parallel (etp_pool pool, unsigned int threads) { - if (wanted > nthreads) - wanted = nthreads; + if (pool->wanted > threads) + pool->wanted = threads; - while (started > wanted) - etp_end_thread (); + while (pool->started > pool->wanted) + etp_end_thread (pool); }