--- libeio/etp.c 2013/04/14 09:43:19 1.1 +++ libeio/etp.c 2015/06/25 20:41:03 1.9 @@ -1,7 +1,7 @@ /* * libetp implementation * - * Copyright (c) 2007,2008,2009,2010,2011,2012,2013 Marc Alexander Lehmann + * Copyright (c) 2007,2008,2009,2010,2011,2012,2013,2015 Marc Alexander Lehmann * All rights reserved. * * Redistribution and use in source and binary forms, with or without modifica- @@ -54,10 +54,22 @@ # define ETP_TYPE_GROUP 1 #endif +#ifndef ETP_WANT_POLL +# define ETP_WANT_POLL(pool) pool->want_poll_cb (pool->userdata) +#endif +#ifndef ETP_DONE_POLL +# define ETP_DONE_POLL(pool) pool->done_poll_cb (pool->userdata) +#endif + #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) #define ETP_TICKS ((1000000 + 1023) >> 10) +enum { + ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */ + ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */ +}; + /* calculate time difference in ~1/ETP_TICKS of a second */ ecb_inline int etp_tvdiff (struct timeval *tv1, struct timeval *tv2) @@ -66,30 +78,44 @@ + ((tv2->tv_usec - tv1->tv_usec) >> 10); } -static unsigned int started, idle, wanted = 4; +struct etp_tmpbuf +{ + void *ptr; + int len; +}; + +static void * +etp_tmpbuf_get (struct etp_tmpbuf *buf, int len) +{ + if (buf->len < len) + { + free (buf->ptr); + buf->ptr = malloc (buf->len = len); + } -static void (*want_poll_cb) (void); -static void (*done_poll_cb) (void); - -static unsigned int max_poll_time; /* reslock */ -static unsigned int max_poll_reqs; /* reslock */ + return buf->ptr; +} + +/* + * a somewhat faster data structure might be nice, but + * with 8 priorities this actually needs <20 insns + * per shift, the most expensive operation. + */ +typedef struct +{ + ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ + int size; +} etp_reqq; -static unsigned int nreqs; /* reqlock */ -static unsigned int nready; /* reqlock */ -static unsigned int npending; /* reqlock */ -static unsigned int max_idle = 4; /* maximum number of threads that can idle indefinitely */ -static unsigned int idle_timeout = 10; /* number of seconds after which an idle threads exit */ - -static xmutex_t wrklock; -static xmutex_t reslock; -static xmutex_t reqlock; -static xcond_t reqwait; +typedef struct etp_pool *etp_pool; typedef struct etp_worker { - struct tmpbuf tmpbuf; + etp_pool pool; + + struct etp_tmpbuf tmpbuf; - /* locked by wrklock */ + /* locked by pool->wrklock */ struct etp_worker *prev, *next; xthread_t tid; @@ -99,10 +125,37 @@ #endif } etp_worker; -static etp_worker wrk_first; /* NOT etp */ +struct etp_pool +{ + void *userdata; + + etp_reqq req_queue; + etp_reqq res_queue; + + unsigned int started, idle, wanted; + + unsigned int max_poll_time; /* pool->reslock */ + unsigned int max_poll_reqs; /* pool->reslock */ + + unsigned int nreqs; /* pool->reqlock */ + unsigned int nready; /* pool->reqlock */ + unsigned int npending; /* pool->reqlock */ + unsigned int max_idle; /* maximum number of threads that can pool->idle indefinitely */ + unsigned int idle_timeout; /* number of seconds after which an pool->idle threads exit */ + + void (*want_poll_cb) (void *userdata); + void (*done_poll_cb) (void *userdata); + + xmutex_t wrklock; + xmutex_t reslock; + xmutex_t reqlock; + xcond_t reqwait; -#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) -#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) + etp_worker wrk_first; +}; + +#define ETP_WORKER_LOCK(wrk) X_LOCK (pool->wrklock) +#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (pool->wrklock) /* worker threads management */ @@ -123,64 +176,51 @@ } ETP_API_DECL unsigned int -etp_nreqs (void) +etp_nreqs (etp_pool pool) { int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_nready (void) +etp_nready (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = nready; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->nready; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_npending (void) +etp_npending (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = npending; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->npending; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } ETP_API_DECL unsigned int -etp_nthreads (void) +etp_nthreads (etp_pool pool) { unsigned int retval; - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - retval = started; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + retval = pool->started; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); return retval; } -/* - * a somewhat faster data structure might be nice, but - * with 8 priorities this actually needs <20 insns - * per shift, the most expensive operation. - */ -typedef struct { - ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ - int size; -} etp_reqq; - -static etp_reqq req_queue; -static etp_reqq res_queue; - static void ecb_noinline ecb_cold reqq_init (etp_reqq *q) { @@ -221,11 +261,11 @@ for (pri = ETP_NUM_PRI; pri--; ) { - eio_req *req = q->qs[pri]; + ETP_REQ *req = q->qs[pri]; if (req) { - if (!(q->qs[pri] = (eio_req *)req->next)) + if (!(q->qs[pri] = (ETP_REQ *)req->next)) q->qe[pri] = 0; return req; @@ -236,100 +276,204 @@ } ETP_API_DECL int ecb_cold -etp_init (void (*want_poll)(void), void (*done_poll)(void)) +etp_init (etp_pool pool, void *userdata, void (*want_poll)(void *userdata), void (*done_poll)(void *userdata)) { - X_MUTEX_CREATE (wrklock); - X_MUTEX_CREATE (reslock); - X_MUTEX_CREATE (reqlock); - X_COND_CREATE (reqwait); - - reqq_init (&req_queue); - reqq_init (&res_queue); - - wrk_first.next = - wrk_first.prev = &wrk_first; - - started = 0; - idle = 0; - nreqs = 0; - nready = 0; - npending = 0; - - want_poll_cb = want_poll; - done_poll_cb = done_poll; + X_MUTEX_CREATE (pool->wrklock); + X_MUTEX_CREATE (pool->reslock); + X_MUTEX_CREATE (pool->reqlock); + X_COND_CREATE (pool->reqwait); + + reqq_init (&pool->req_queue); + reqq_init (&pool->res_queue); + + pool->wrk_first.next = + pool->wrk_first.prev = &pool->wrk_first; + + pool->started = 0; + pool->idle = 0; + pool->nreqs = 0; + pool->nready = 0; + pool->npending = 0; + pool->wanted = 4; + + pool->max_idle = 4; /* maximum number of threads that can pool->idle indefinitely */ + pool->idle_timeout = 10; /* number of seconds after which an pool->idle threads exit */ + + pool->userdata = userdata; + pool->want_poll_cb = want_poll; + pool->done_poll_cb = done_poll; return 0; } -/* not yet in etp.c */ -X_THREAD_PROC (etp_proc); +static void ecb_noinline ecb_cold +etp_proc_init (void) +{ +#if HAVE_PRCTL_SET_NAME + /* provide a more sensible "thread name" */ + char name[16 + 1]; + const int namelen = sizeof (name) - 1; + int len; + + prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); + name [namelen] = 0; + len = strlen (name); + strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); + prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); +#endif +} + +X_THREAD_PROC (etp_proc) +{ + ETP_REQ *req; + struct timespec ts; + etp_worker *self = (etp_worker *)thr_arg; + etp_pool pool = self->pool; + + etp_proc_init (); + + /* try to distribute timeouts somewhat evenly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); + + for (;;) + { + ts.tv_sec = 0; + + X_LOCK (pool->reqlock); + + for (;;) + { + req = reqq_shift (&pool->req_queue); + + if (ecb_expect_true (req)) + break; + + if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ + { + X_UNLOCK (pool->reqlock); + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); + goto quit; + } + + ++pool->idle; + + if (pool->idle <= pool->max_idle) + /* we are allowed to pool->idle, so do so without any timeout */ + X_COND_WAIT (pool->reqwait, pool->reqlock); + else + { + /* initialise timeout once */ + if (!ts.tv_sec) + ts.tv_sec = time (0) + pool->idle_timeout; + + if (X_COND_TIMEDWAIT (pool->reqwait, pool->reqlock, ts) == ETIMEDOUT) + ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ + } + + --pool->idle; + } + + --pool->nready; + + X_UNLOCK (pool->reqlock); + + if (ecb_expect_false (req->type == ETP_TYPE_QUIT)) + goto quit; + + ETP_EXECUTE (self, req); + + X_LOCK (pool->reslock); + + ++pool->npending; + + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); + + etp_worker_clear (self); + + X_UNLOCK (pool->reslock); + } + +quit: + free (req); + + X_LOCK (pool->wrklock); + etp_worker_free (self); + X_UNLOCK (pool->wrklock); + + return 0; +} static void ecb_cold -etp_start_thread (void) +etp_start_thread (etp_pool pool) { etp_worker *wrk = calloc (1, sizeof (etp_worker)); /*TODO*/ assert (("unable to allocate worker thread data", wrk)); - X_LOCK (wrklock); + wrk->pool = pool; + + X_LOCK (pool->wrklock); if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) { - wrk->prev = &wrk_first; - wrk->next = wrk_first.next; - wrk_first.next->prev = wrk; - wrk_first.next = wrk; - ++started; + wrk->prev = &pool->wrk_first; + wrk->next = pool->wrk_first.next; + pool->wrk_first.next->prev = wrk; + pool->wrk_first.next = wrk; + ++pool->started; } else free (wrk); - X_UNLOCK (wrklock); + X_UNLOCK (pool->wrklock); } static void -etp_maybe_start_thread (void) +etp_maybe_start_thread (etp_pool pool) { - if (ecb_expect_true (etp_nthreads () >= wanted)) + if (ecb_expect_true (etp_nthreads (pool) >= pool->wanted)) return; - /* todo: maybe use idle here, but might be less exact */ - if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) + /* todo: maybe use pool->idle here, but might be less exact */ + if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; - etp_start_thread (); + etp_start_thread (pool); } static void ecb_cold -etp_end_thread (void) +etp_end_thread (etp_pool pool) { - eio_req *req = calloc (1, sizeof (eio_req)); /* will be freed by worker */ + ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ req->type = ETP_TYPE_QUIT; req->pri = ETP_PRI_MAX - ETP_PRI_MIN; - X_LOCK (reqlock); - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); - - X_LOCK (wrklock); - --started; - X_UNLOCK (wrklock); + X_LOCK (pool->reqlock); + reqq_push (&pool->req_queue, req); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); + + X_LOCK (pool->wrklock); + --pool->started; + X_UNLOCK (pool->wrklock); } ETP_API_DECL int -etp_poll (void) +etp_poll (etp_pool pool) { unsigned int maxreqs; unsigned int maxtime; struct timeval tv_start, tv_now; - X_LOCK (reslock); - maxreqs = max_poll_reqs; - maxtime = max_poll_time; - X_UNLOCK (reslock); + X_LOCK (pool->reslock); + maxreqs = pool->max_poll_reqs; + maxtime = pool->max_poll_time; + X_UNLOCK (pool->reslock); if (maxtime) gettimeofday (&tv_start, 0); @@ -338,31 +482,31 @@ { ETP_REQ *req; - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); - X_LOCK (reslock); - req = reqq_shift (&res_queue); + X_LOCK (pool->reslock); + req = reqq_shift (&pool->res_queue); - if (req) + if (ecb_expect_true (req)) { - --npending; + --pool->npending; - if (!res_queue.size && done_poll_cb) - done_poll_cb (); + if (!pool->res_queue.size) + ETP_DONE_POLL (pool); } - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); - if (!req) + if (ecb_expect_false (!req)) return 0; - X_LOCK (reqlock); - --nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + --pool->nreqs; + X_UNLOCK (pool->reqlock); if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { - req->int1 = 1; /* mark request as delayed */ + req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ continue; } else @@ -389,25 +533,25 @@ } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp); +etp_grp_cancel (etp_pool pool, ETP_REQ *grp); ETP_API_DECL void -etp_cancel (ETP_REQ *req) +etp_cancel (etp_pool pool, ETP_REQ *req) { req->cancelled = 1; - etp_grp_cancel (req); + etp_grp_cancel (pool, req); } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp) +etp_grp_cancel (etp_pool pool, ETP_REQ *grp) { for (grp = grp->grp_first; grp; grp = grp->grp_next) - etp_cancel (grp); + etp_cancel (pool, grp); } ETP_API_DECL void -etp_submit (ETP_REQ *req) +etp_submit (etp_pool pool, ETP_REQ *req) { req->pri -= ETP_PRI_MIN; @@ -417,78 +561,78 @@ if (ecb_expect_false (req->type == ETP_TYPE_GROUP)) { /* I hope this is worth it :/ */ - X_LOCK (reqlock); - ++nreqs; - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + ++pool->nreqs; + X_UNLOCK (pool->reqlock); - X_LOCK (reslock); + X_LOCK (pool->reslock); - ++npending; + ++pool->npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) - want_poll_cb (); + if (!reqq_push (&pool->res_queue, req)) + ETP_WANT_POLL (pool); - X_UNLOCK (reslock); + X_UNLOCK (pool->reslock); } else { - X_LOCK (reqlock); - ++nreqs; - ++nready; - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); + X_LOCK (pool->reqlock); + ++pool->nreqs; + ++pool->nready; + reqq_push (&pool->req_queue, req); + X_COND_SIGNAL (pool->reqwait); + X_UNLOCK (pool->reqlock); - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); } } ETP_API_DECL void ecb_cold -etp_set_max_poll_time (double nseconds) +etp_set_max_poll_time (etp_pool pool, double nseconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_time = nseconds * ETP_TICKS; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_time = nseconds * ETP_TICKS; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold -etp_set_max_poll_reqs (unsigned int maxreqs) +etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) { - if (WORDACCESS_UNSAFE) X_LOCK (reslock); - max_poll_reqs = maxreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reslock); + pool->max_poll_reqs = maxreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reslock); } ETP_API_DECL void ecb_cold -etp_set_max_idle (unsigned int nthreads) +etp_set_max_idle (etp_pool pool, unsigned int nthreads) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - max_idle = nthreads; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->max_idle = nthreads; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold -etp_set_idle_timeout (unsigned int seconds) +etp_set_idle_timeout (etp_pool pool, unsigned int seconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - idle_timeout = seconds; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (pool->reqlock); + pool->idle_timeout = seconds; + if (WORDACCESS_UNSAFE) X_UNLOCK (pool->reqlock); } ETP_API_DECL void ecb_cold -etp_set_min_parallel (unsigned int nthreads) +etp_set_min_parallel (etp_pool pool, unsigned int nthreads) { - if (wanted < nthreads) - wanted = nthreads; + if (pool->wanted < nthreads) + pool->wanted = nthreads; } ETP_API_DECL void ecb_cold -etp_set_max_parallel (unsigned int nthreads) +etp_set_max_parallel (etp_pool pool, unsigned int nthreads) { - if (wanted > nthreads) - wanted = nthreads; + if (pool->wanted > nthreads) + pool->wanted = nthreads; - while (started > wanted) - etp_end_thread (); + while (pool->started > pool->wanted) + etp_end_thread (pool); }