--- libeio/etp.c 2015/06/25 17:40:24 1.5 +++ libeio/etp.c 2018/08/14 09:29:50 1.12 @@ -1,7 +1,7 @@ /* * libetp implementation * - * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann + * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -54,6 +54,13 @@ # define ETP_TYPE_GROUP 1 #endif +#ifndef ETP_WANT_POLL +# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata) +#endif +#ifndef ETP_DONE_POLL +# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata) +#endif + #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) #define ETP_TICKS ((1000000 + 1023) >> 10) @@ -71,25 +78,6 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } -static unsigned int started, idle, wanted = 4; - -static void (*want_poll_cb) (void); -static void (*done_poll_cb) (void); - -static unsigned int max_poll_time; /* reslock */ -static unsigned int max_poll_reqs; /* reslock */ - -static unsigned int nreqs; /* reqlock */ -static unsigned int nready; /* reqlock */ -static unsigned int npending; /* reqlock */ -static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */ -static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */ - -static xmutex_t wrklock; -static xmutex_t reslock; -static xmutex_t reqlock; -static xcond_t reqwait; - struct etp_tmpbuf { void *ptr; @@ -119,12 +107,6 @@ int size; } etp_reqq; -struct etp_pool -{ - etp_reqq req_queue; - etp_reqq res_queue; -}; - typedef struct etp_pool *etp_pool; typedef struct etp_worker @@ -133,7 +115,7 @@ struct etp_tmpbuf tmpbuf; - /* locked by wrklock */ + /* locked by pool->wrklock */ struct etp_worker *prev, *next; xthread_t tid; @@ -143,10 +125,37 @@ #endif } etp_worker; -static etp_worker wrk_first; /* NOT etp */ +struct etp_pool +{ + void *userdata; + + etp_reqq req_queue; + etp_reqq res_queue; -#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) -#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) + unsigned int started, idle, wanted; + + unsigned int max_poll_time; /* pool->reslock */ + unsigned int max_poll_reqs; /* pool->reslock */ + + unsigned int nreqs; /* pool->reqlock */ + unsigned int nready; /* pool->reqlock */ + unsigned int npending; /* pool->reqlock */ + unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */ + unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */ + + void (*want_poll_cb) (void *userdata); + void (*done_poll_cb) (void *userdata); + + xmutex_t wrklock; + xmutex_t reslock; + xmutex_t reqlock; + xcond_t reqwait; + + etp_worker wrk_first; +}; + +#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) +#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) /* worker threads management */ @@ -170,9 +179,9 @@ etp_nreqs (etp_pool pool) { int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } @@ -181,9 +190,9 @@ { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nready; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nready; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } @@ -193,9 +202,9 @@ { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = npending; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->npending; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } @@ -205,9 +214,9 @@ { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = started; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->started; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } @@ -267,27 +276,32 @@ } ETP_API_DECL int ecb_cold -etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void)) +etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) { - X_MUTEX_CREATE (wrklock); - X_MUTEX_CREATE (reslock); - X_MUTEX_CREATE (reqlock); - X_COND_CREATE (reqwait); + X_MUTEX_CREATE (pool->wrklock); + X_MUTEX_CREATE (pool->reslock); + X_MUTEX_CREATE (pool->reqlock); + X_COND_CREATE (pool->reqwait); reqq_init (&pool->req_queue); reqq_init (&pool->res_queue); - wrk_first.next = - wrk_first.prev = &wrk_first; - - started = 0; - idle = 0; - nreqs = 0; - nready = 0; - npending = 0; + pool->wrk_first.next = + pool->wrk_first.prev = &pool->wrk_first; - want_poll_cb = want_poll; - done_poll_cb = done_poll; + pool->started = 0; + pool->idle = 0; + pool->nreqs = 0; + pool->nready = 0; + pool->npending = 0; + pool->wanted = 4; + + pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ + pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ + + pool->userdata = userdata; + pool->want_poll_cb = want_poll; + pool->done_poll_cb = done_poll; return 0; } @@ -325,69 +339,69 @@ { ts.tv_sec = 0; - X_LOCK (reqlock); + X_LOCK (pool->reqlock); for (;;) { req = reqq_shift (&pool->req_queue); - if (req) + if (ecb_expect_true (req)) break; if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ { - X_UNLOCK (reqlock); - X_LOCK (wrklock); - --started; - X_UNLOCK (wrklock); + X_UNLOCK (pool->reqlock); + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); goto quit; } - ++idle; + ++pool->idle; - if (idle <= max_idle) - /* we are allowed to idle, so do so without any timeout */ - X_COND_WAIT (reqwait, reqlock); + if (pool->idle <= pool->max_idle) + /* we are allowed to pool->idle, so do so without any timeout */ + X_COND_WAIT (pool->reqwait, pool->reqlock); else { /* initialise timeout once */ if (!ts.tv_sec) - ts.tv_sec = time (0) + idle_timeout; + ts.tv_sec = time (0) + pool->idle_timeout; - if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) + if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT) ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ } - --idle; + --pool->idle; } - --nready; + --pool->nready; - X_UNLOCK (reqlock); + X_UNLOCK (pool->reqlock); - if (req->type == ETP_TYPE_QUIT) + if (ecb_expect_false (req->type == ETP_TYPE_QUIT)) goto quit; ETP_EXECUTE (self, req); - X_LOCK (reslock); + X_LOCK (pool->reslock); - ++npending; + ++pool->npending; - if (!reqq_push (&pool->res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); etp_worker_clear (self); - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); } quit: free (req); - X_LOCK (wrklock); + X_LOCK (pool->wrklock); etp_worker_free (self); - X_UNLOCK (wrklock); + X_UNLOCK (pool->wrklock); return 0; } @@ -402,29 +416,29 @@ wrk->pool = pool; - X_LOCK (wrklock); + X_LOCK (pool->wrklock); if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) { - wrk->prev = &wrk_first; - wrk->next = wrk_first.next; - wrk_first.next->prev = wrk; - wrk_first.next = wrk; - ++started; + wrk->prev = &pool->wrk_first; + wrk->next = pool->wrk_first.next; + pool->wrk_first.next->prev = wrk; + pool->wrk_first.next = wrk; + ++pool->started; } else free (wrk); - X_UNLOCK (wrklock); + X_UNLOCK (pool->wrklock); } static void etp_maybe_start_thread (etp_pool pool) { - if (ecb_expect_true (etp_nthreads (pool) >= wanted)) + if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted)) return; - /* todo: maybe use idle here, but might be less exact */ + /* todo: maybe use pool->idle here, but might be less exact */ if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; @@ -439,14 +453,14 @@ req->type = ETP_TYPE_QUIT; req->pri = ETP_PRI_MAX - ETP_PRI_MIN; - X_LOCK (reqlock); + X_LOCK (pool->reqlock); reqq_push (&pool->req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); - X_LOCK (wrklock); - --started; - X_UNLOCK (wrklock); + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); } ETP_API_DECL int @@ -456,10 +470,10 @@ unsigned int maxtime; struct timeval tv_start, tv_now; - X_LOCK (reslock); - maxreqs = max_poll_reqs; - maxtime = max_poll_time; - X_UNLOCK (reslock); + X_LOCK (pool->reslock); + maxreqs = pool->max_poll_reqs; + maxtime = pool->max_poll_time; + X_UNLOCK (pool->reslock); if (maxtime) gettimeofday (&tv_start, 0); @@ -470,25 +484,25 @@ etp_maybe_start_thread (pool); - X_LOCK (reslock); + X_LOCK (pool->reslock); req = reqq_shift (&pool->res_queue); - if (req) + if (ecb_expect_true (req)) { - --npending; + --pool->npending; - if (!pool->res_queue.size && done_poll_cb) - done_poll_cb (); + if (!pool->res_queue.size) + ETP_DONE_POLL (pool); } - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); - if (!req) + if (ecb_expect_false (!req)) return 0; - X_LOCK (reqlock); - --nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + --pool->nreqs; + X_UNLOCK (pool->reqlock); if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { @@ -547,78 +561,78 @@ if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) { /* I hope this is worth it :/ */ - X_LOCK (reqlock); - ++nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + ++pool->nreqs; + X_UNLOCK (pool->reqlock); - X_LOCK (reslock); + X_LOCK (pool->reslock); - ++npending; + ++pool->npending; - if (!reqq_push (&pool->res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); } else { - X_LOCK (reqlock); - ++nreqs; - ++nready; + X_LOCK (pool->reqlock); + ++pool->nreqs; + ++pool->nready; reqq_push (&pool->req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); etp_maybe_start_thread (pool); } } ETP_API_DECL void ecb_cold -etp_set_max_poll_time (etp_pool pool, double nseconds) +etp_set_max_poll_time (etp_pool pool, double seconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_time = nseconds * ETP_TICKS; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_time = seconds * ETP_TICKS; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_reqs = maxreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_reqs = maxreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold -etp_set_max_idle (etp_pool pool, unsigned int nthreads) +etp_set_max_idle (etp_pool pool, unsigned int threads) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - max_idle = nthreads; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->max_idle = threads; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold etp_set_idle_timeout (etp_pool pool, unsigned int seconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - idle_timeout = seconds; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->idle_timeout = seconds; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold -etp_set_min_parallel (etp_pool pool, unsigned int nthreads) +etp_set_min_parallel (etp_pool pool, unsigned int threads) { - if (wanted < nthreads) - wanted = nthreads; + if (pool->wanted < threads) + pool->wanted = threads; } ETP_API_DECL void ecb_cold -etp_set_max_parallel (etp_pool pool, unsigned int nthreads) +etp_set_max_parallel (etp_pool pool, unsigned int threads) { - if (wanted > nthreads) - wanted = nthreads; + if (pool->wanted > threads) + pool->wanted = threads; - while (started > wanted) + while (pool->started > pool->wanted) etp_end_thread (pool); }