/* * libetp implementation * * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- * tion, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER- * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE- * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH- * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. * * Alternatively, the contents of this file may be used under the terms of * the GNU General Public License ("GPL") version 2 or any later version, * in which case the provisions of the GPL are applicable instead of * the above. If you wish to allow the use of your version of this file * only under the terms of the GPL and not to allow others to use your * version of this file under the BSD license, indicate your decision * by deleting the provisions above and replace them with the notice * and other provisions required by the GPL. If you do not delete the * provisions above, a recipient may use your version of this file under * either the BSD or the GPL. */ #if HAVE_SYS_PRCTL_H # include #endif #ifdef EIO_STACKSIZE # define X_STACKSIZE EIO_STACKSIZE #endif #include "xthread.h" #ifndef ETP_API_DECL # define ETP_API_DECL static #endif #ifndef ETP_PRI_MIN # define ETP_PRI_MIN 0 # define ETP_PRI_MAX 0 #endif #ifndef ETP_TYPE_QUIT # define ETP_TYPE_QUIT 0 #endif #ifndef ETP_TYPE_GROUP # define ETP_TYPE_GROUP 1 #endif #ifndef ETP_WANT_POLL # define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata) #endif #ifndef ETP_DONE_POLL # define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata) #endif #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) #define ETP_TICKS ((1000000 + 1023) >> 10) enum { ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */ ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */ }; /* calculate time difference in ~1/ETP_TICKS of a second */ ecb_inline int etp_tvdiff (struct timeval *tv1, struct timeval *tv2) { return (tv2->tv_sec - tv1->tv_sec ) * ETP_TICKS + ((tv2->tv_usec - tv1->tv_usec) >> 10); } struct etp_tmpbuf { void *ptr; int len; }; static void * etp_tmpbuf_get (struct etp_tmpbuf *buf, int len) { if (buf->len < len) { free (buf->ptr); buf->ptr = malloc (buf->len = len); } return buf->ptr; } /* * a somewhat faster data structure might be nice, but * with 8 priorities this actually needs <20 insns * per shift, the most expensive operation. */ typedef struct { ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ int size; } etp_reqq; typedef struct etp_pool *etp_pool; typedef struct etp_worker { etp_pool pool; struct etp_tmpbuf tmpbuf; /* locked by pool->wrklock */ struct etp_worker *prev, *next; xthread_t tid; #ifdef ETP_WORKER_COMMON ETP_WORKER_COMMON #endif } etp_worker; struct etp_pool { void *userdata; etp_reqq req_queue; etp_reqq res_queue; unsigned int started, idle, wanted; unsigned int max_poll_time; /* pool->reslock */ unsigned int max_poll_reqs; /* pool->reslock */ unsigned int nreqs; /* pool->reqlock */ unsigned int nready; /* pool->reqlock */ unsigned int npending; /* pool->reqlock */ unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */ unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */ void (*want_poll_cb) (void *userdata); void (*done_poll_cb) (void *userdata); xmutex_t wrklock; xmutex_t reslock; xmutex_t reqlock; xcond_t reqwait; etp_worker wrk_first; }; #define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) #define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) /* worker threads management */ static void etp_worker_clear (etp_worker *wrk) { } static void ecb_cold etp_worker_free (etp_worker *wrk) { free (wrk->tmpbuf.ptr); wrk->next->prev = wrk->prev; wrk->prev->next = wrk->next; free (wrk); } ETP_API_DECL unsigned int etp_nreqs (etp_pool pool) { int retval; if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); retval = pool->nreqs; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int etp_nready (etp_pool pool) { unsigned int retval; if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); retval = pool->nready; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int etp_npending (etp_pool pool) { unsigned int retval; if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); retval = pool->npending; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int etp_nthreads (etp_pool pool) { unsigned int retval; if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); retval = pool->started; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } static void ecb_noinline ecb_cold reqq_init (etp_reqq *q) { int pri; for (pri = 0; pri < ETP_NUM_PRI; ++pri) q->qs[pri] = q->qe[pri] = 0; q->size = 0; } static int ecb_noinline reqq_push (etp_reqq *q, ETP_REQ *req) { int pri = req->pri; req->next = 0; if (q->qe[pri]) { q->qe[pri]->next = req; q->qe[pri] = req; } else q->qe[pri] = q->qs[pri] = req; return q->size++; } static ETP_REQ * ecb_noinline reqq_shift (etp_reqq *q) { int pri; if (!q->size) return 0; --q->size; for (pri = ETP_NUM_PRI; pri--; ) { ETP_REQ *req = q->qs[pri]; if (req) { if (!(q->qs[pri] = (ETP_REQ *)req->next)) q->qe[pri] = 0; return req; } } abort (); } ETP_API_DECL int ecb_cold etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) { X_MUTEX_CREATE (pool->wrklock); X_MUTEX_CREATE (pool->reslock); X_MUTEX_CREATE (pool->reqlock); X_COND_CREATE (pool->reqwait); reqq_init (&pool->req_queue); reqq_init (&pool->res_queue); pool->wrk_first.next = pool->wrk_first.prev = &pool->wrk_first; pool->started = 0; pool->idle = 0; pool->nreqs = 0; pool->nready = 0; pool->npending = 0; pool->wanted = 4; pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ pool->userdata = userdata; pool->want_poll_cb = want_poll; pool->done_poll_cb = done_poll; return 0; } static void ecb_noinline ecb_cold etp_proc_init (void) { #if HAVE_PRCTL_SET_NAME /* provide a more sensible "thread name" */ char name[16 + 1]; const int namelen = sizeof (name) - 1; int len; prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); name [namelen] = 0; len = strlen (name); strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); #endif } X_THREAD_PROC (etp_proc) { ETP_REQ *req; struct timespec ts; etp_worker *self = (etp_worker *)thr_arg; etp_pool pool = self->pool; etp_proc_init (); /* try to distribute timeouts somewhat evenly */ ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = 0; X_LOCK (pool->reqlock); for (;;) { req = reqq_shift (&pool->req_queue); if (ecb_expect_true (req)) break; if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ { X_UNLOCK (pool->reqlock); X_LOCK (pool->wrklock); --pool->started; X_UNLOCK (pool->wrklock); goto quit; } ++pool->idle; if (pool->idle <= pool->max_idle) /* we are allowed to pool->idle, so do so without any timeout */ X_COND_WAIT (pool->reqwait, pool->reqlock); else { /* initialise timeout once */ if (!ts.tv_sec) ts.tv_sec = time (0) + pool->idle_timeout; if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT) ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ } --pool->idle; } --pool->nready; X_UNLOCK (pool->reqlock); if (ecb_expect_false (req->type == ETP_TYPE_QUIT)) goto quit; ETP_EXECUTE (self, req); X_LOCK (pool->reslock); ++pool->npending; if (!reqq_push (&pool->res_queue, req)) ETP_WANT_POLL (pool); etp_worker_clear (self); X_UNLOCK (pool->reslock); } quit: free (req); X_LOCK (pool->wrklock); etp_worker_free (self); X_UNLOCK (pool->wrklock); return 0; } static void ecb_cold etp_start_thread (etp_pool pool) { etp_worker *wrk = calloc (1, sizeof (etp_worker)); /*TODO*/ assert (("unable to allocate worker thread data", wrk)); wrk->pool = pool; X_LOCK (pool->wrklock); if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) { wrk->prev = &pool->wrk_first; wrk->next = pool->wrk_first.next; pool->wrk_first.next->prev = wrk; pool->wrk_first.next = wrk; ++pool->started; } else free (wrk); X_UNLOCK (pool->wrklock); } static void etp_maybe_start_thread (etp_pool pool) { if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted)) return; /* todo: maybe use pool->idle here, but might be less exact */ if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; etp_start_thread (pool); } static void ecb_cold etp_end_thread (etp_pool pool) { ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ req->type = ETP_TYPE_QUIT; req->pri = ETP_PRI_MAX - ETP_PRI_MIN; X_LOCK (pool->reqlock); reqq_push (&pool->req_queue, req); X_COND_SIGNAL (pool->reqwait); X_UNLOCK (pool->reqlock); X_LOCK (pool->wrklock); --pool->started; X_UNLOCK (pool->wrklock); } ETP_API_DECL int etp_poll (etp_pool pool) { unsigned int maxreqs; unsigned int maxtime; struct timeval tv_start, tv_now; X_LOCK (pool->reslock); maxreqs = pool->max_poll_reqs; maxtime = pool->max_poll_time; X_UNLOCK (pool->reslock); if (maxtime) gettimeofday (&tv_start, 0); for (;;) { ETP_REQ *req; etp_maybe_start_thread (pool); X_LOCK (pool->reslock); req = reqq_shift (&pool->res_queue); if (ecb_expect_true (req)) { --pool->npending; if (!pool->res_queue.size) ETP_DONE_POLL (pool); } X_UNLOCK (pool->reslock); if (ecb_expect_false (!req)) return 0; X_LOCK (pool->reqlock); --pool->nreqs; X_UNLOCK (pool->reqlock); if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ continue; } else { int res = ETP_FINISH (req); if (ecb_expect_false (res)) return res; } if (ecb_expect_false (maxreqs && !--maxreqs)) break; if (maxtime) { gettimeofday (&tv_now, 0); if (etp_tvdiff (&tv_start, &tv_now) >= maxtime) break; } } errno = EAGAIN; return -1; } ETP_API_DECL void etp_grp_cancel (etp_pool pool, ETP_REQ *grp); ETP_API_DECL void etp_cancel (etp_pool pool, ETP_REQ *req) { req->cancelled = 1; etp_grp_cancel (pool, req); } ETP_API_DECL void etp_grp_cancel (etp_pool pool, ETP_REQ *grp) { for (grp = grp->grp_first; grp; grp = grp->grp_next) etp_cancel (pool, grp); } ETP_API_DECL void etp_submit (etp_pool pool, ETP_REQ *req) { req->pri -= ETP_PRI_MIN; if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) { /* I hope this is worth it :/ */ X_LOCK (pool->reqlock); ++pool->nreqs; X_UNLOCK (pool->reqlock); X_LOCK (pool->reslock); ++pool->npending; if (!reqq_push (&pool->res_queue, req)) ETP_WANT_POLL (pool); X_UNLOCK (pool->reslock); } else { X_LOCK (pool->reqlock); ++pool->nreqs; ++pool->nready; reqq_push (&pool->req_queue, req); X_COND_SIGNAL (pool->reqwait); X_UNLOCK (pool->reqlock); etp_maybe_start_thread (pool); } } ETP_API_DECL void ecb_cold etp_set_max_poll_time (etp_pool pool, double seconds) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); pool->max_poll_time = seconds * ETP_TICKS; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); pool->max_poll_reqs = maxreqs; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold etp_set_max_idle (etp_pool pool, unsigned int threads) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); pool->max_idle = threads; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold etp_set_idle_timeout (etp_pool pool, unsigned int seconds) { if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); pool->idle_timeout = seconds; if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold etp_set_min_parallel (etp_pool pool, unsigned int threads) { if (pool->wanted < threads) pool->wanted = threads; } ETP_API_DECL void ecb_cold etp_set_max_parallel (etp_pool pool, unsigned int threads) { if (pool->wanted > threads) pool->wanted = threads; while (pool->started > pool->wanted) etp_end_thread (pool); }