--- libeio/etp.c 2015/06/25 15:59:57 1.3 +++ libeio/etp.c 2015/06/25 17:40:24 1.5 @@ -58,6 +58,11 @@ #define ETP_TICKS ((1000000 + 1023) >> 10) +enum { + ETP_FLAG_GROUPADD = 0x04, /* some request was added to the group */ + ETP_FLAG_DELAYED = 0x08, /* groiup request has been delayed */ +}; + /* calculate time difference in ~1/ETP_TICKS of a second */ ecb_inline int etp_tvdiff (struct timeval *tv1, struct timeval *tv2) @@ -103,8 +108,29 @@ return buf->ptr; } +/* + * a somewhat faster data structure might be nice, but + * with 8 priorities this actually needs <20 insns + * per shift, the most expensive operation. + */ +typedef struct +{ + ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ + int size; +} etp_reqq; + +struct etp_pool +{ + etp_reqq req_queue; + etp_reqq res_queue; +}; + +typedef struct etp_pool *etp_pool; + typedef struct etp_worker { + etp_pool pool; + struct etp_tmpbuf tmpbuf; /* locked by wrklock */ @@ -141,7 +167,7 @@ } ETP_API_DECL unsigned int -etp_nreqs (void) +etp_nreqs (etp_pool pool) { int retval; if (WORDACCESS_UNSAFE) X_LOCK (reqlock); @@ -151,7 +177,7 @@ } ETP_API_DECL unsigned int -etp_nready (void) +etp_nready (etp_pool pool) { unsigned int retval; @@ -163,7 +189,7 @@ } ETP_API_DECL unsigned int -etp_npending (void) +etp_npending (etp_pool pool) { unsigned int retval; @@ -175,7 +201,7 @@ } ETP_API_DECL unsigned int -etp_nthreads (void) +etp_nthreads (etp_pool pool) { unsigned int retval; @@ -186,19 +212,6 @@ return retval; } -/* - * a somewhat faster data structure might be nice, but - * with 8 priorities this actually needs <20 insns - * per shift, the most expensive operation. - */ -typedef struct { - ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ - int size; -} etp_reqq; - -static etp_reqq req_queue; -static etp_reqq res_queue; - static void ecb_noinline ecb_cold reqq_init (etp_reqq *q) { @@ -254,15 +267,15 @@ } ETP_API_DECL int ecb_cold -etp_init (void (*want_poll)(void), void (*done_poll)(void)) +etp_init (etp_pool pool, void (*want_poll)(void), void (*done_poll)(void)) { X_MUTEX_CREATE (wrklock); X_MUTEX_CREATE (reslock); X_MUTEX_CREATE (reqlock); X_COND_CREATE (reqwait); - reqq_init (&req_queue); - reqq_init (&res_queue); + reqq_init (&pool->req_queue); + reqq_init (&pool->res_queue); wrk_first.next = wrk_first.prev = &wrk_first; @@ -279,17 +292,116 @@ return 0; } -/* not yet in etp.c */ -X_THREAD_PROC (etp_proc); +static void ecb_noinline ecb_cold +etp_proc_init (void) +{ +#if HAVE_PRCTL_SET_NAME + /* provide a more sensible "thread name" */ + char name[16 + 1]; + const int namelen = sizeof (name) - 1; + int len; + + prctl (PR_GET_NAME, (unsigned long)name, 0, 0, 0); + name [namelen] = 0; + len = strlen (name); + strcpy (name + (len <= namelen - 4 ? len : namelen - 4), "/eio"); + prctl (PR_SET_NAME, (unsigned long)name, 0, 0, 0); +#endif +} + +X_THREAD_PROC (etp_proc) +{ + ETP_REQ *req; + struct timespec ts; + etp_worker *self = (etp_worker *)thr_arg; + etp_pool pool = self->pool; + + etp_proc_init (); + + /* try to distribute timeouts somewhat evenly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); + + for (;;) + { + ts.tv_sec = 0; + + X_LOCK (reqlock); + + for (;;) + { + req = reqq_shift (&pool->req_queue); + + if (req) + break; + + if (ts.tv_sec == 1) /* no request, but timeout detected, let's quit */ + { + X_UNLOCK (reqlock); + X_LOCK (wrklock); + --started; + X_UNLOCK (wrklock); + goto quit; + } + + ++idle; + + if (idle <= max_idle) + /* we are allowed to idle, so do so without any timeout */ + X_COND_WAIT (reqwait, reqlock); + else + { + /* initialise timeout once */ + if (!ts.tv_sec) + ts.tv_sec = time (0) + idle_timeout; + + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) + ts.tv_sec = 1; /* assuming this is not a value computed above.,.. */ + } + + --idle; + } + + --nready; + + X_UNLOCK (reqlock); + + if (req->type == ETP_TYPE_QUIT) + goto quit; + + ETP_EXECUTE (self, req); + + X_LOCK (reslock); + + ++npending; + + if (!reqq_push (&pool->res_queue, req) && want_poll_cb) + want_poll_cb (); + + etp_worker_clear (self); + + X_UNLOCK (reslock); + } + +quit: + free (req); + + X_LOCK (wrklock); + etp_worker_free (self); + X_UNLOCK (wrklock); + + return 0; +} static void ecb_cold -etp_start_thread (void) +etp_start_thread (etp_pool pool) { etp_worker *wrk = calloc (1, sizeof (etp_worker)); /*TODO*/ assert (("unable to allocate worker thread data", wrk)); + wrk->pool = pool; + X_LOCK (wrklock); if (xthread_create (&wrk->tid, etp_proc, (void *)wrk)) @@ -307,20 +419,20 @@ } static void -etp_maybe_start_thread (void) +etp_maybe_start_thread (etp_pool pool) { - if (ecb_expect_true (etp_nthreads () >= wanted)) + if (ecb_expect_true (etp_nthreads (pool) >= wanted)) return; /* todo: maybe use idle here, but might be less exact */ - if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) + if (ecb_expect_true (0 <= (int)etp_nthreads (pool) + (int)etp_npending (pool) - (int)etp_nreqs (pool))) return; - etp_start_thread (); + etp_start_thread (pool); } static void ecb_cold -etp_end_thread (void) +etp_end_thread (etp_pool pool) { ETP_REQ *req = calloc (1, sizeof (ETP_REQ)); /* will be freed by worker */ @@ -328,7 +440,7 @@ req->pri = ETP_PRI_MAX - ETP_PRI_MIN; X_LOCK (reqlock); - reqq_push (&req_queue, req); + reqq_push (&pool->req_queue, req); X_COND_SIGNAL (reqwait); X_UNLOCK (reqlock); @@ -338,7 +450,7 @@ } ETP_API_DECL int -etp_poll (void) +etp_poll (etp_pool pool) { unsigned int maxreqs; unsigned int maxtime; @@ -356,16 +468,16 @@ { ETP_REQ *req; - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); X_LOCK (reslock); - req = reqq_shift (&res_queue); + req = reqq_shift (&pool->res_queue); if (req) { --npending; - if (!res_queue.size && done_poll_cb) + if (!pool->res_queue.size && done_poll_cb) done_poll_cb (); } @@ -380,7 +492,7 @@ if (ecb_expect_false (req->type == ETP_TYPE_GROUP && req->size)) { - req->int1 = 1; /* mark request as delayed */ + req->flags |= ETP_FLAG_DELAYED; /* mark request as delayed */ continue; } else @@ -407,25 +519,25 @@ } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp); +etp_grp_cancel (etp_pool pool, ETP_REQ *grp); ETP_API_DECL void -etp_cancel (ETP_REQ *req) +etp_cancel (etp_pool pool, ETP_REQ *req) { req->cancelled = 1; - etp_grp_cancel (req); + etp_grp_cancel (pool, req); } ETP_API_DECL void -etp_grp_cancel (ETP_REQ *grp) +etp_grp_cancel (etp_pool pool, ETP_REQ *grp) { for (grp = grp->grp_first; grp; grp = grp->grp_next) - etp_cancel (grp); + etp_cancel (pool, grp); } ETP_API_DECL void -etp_submit (ETP_REQ *req) +etp_submit (etp_pool pool, ETP_REQ *req) { req->pri -= ETP_PRI_MIN; @@ -443,7 +555,7 @@ ++npending; - if (!reqq_push (&res_queue, req) && want_poll_cb) + if (!reqq_push (&pool->res_queue, req) && want_poll_cb) want_poll_cb (); X_UNLOCK (reslock); @@ -453,16 +565,16 @@ X_LOCK (reqlock); ++nreqs; ++nready; - reqq_push (&req_queue, req); + reqq_push (&pool->req_queue, req); X_COND_SIGNAL (reqwait); X_UNLOCK (reqlock); - etp_maybe_start_thread (); + etp_maybe_start_thread (pool); } } ETP_API_DECL void ecb_cold -etp_set_max_poll_time (double nseconds) +etp_set_max_poll_time (etp_pool pool, double nseconds) { if (WORDACCESS_UNSAFE) X_LOCK (reslock); max_poll_time = nseconds * ETP_TICKS; @@ -470,7 +582,7 @@ } ETP_API_DECL void ecb_cold -etp_set_max_poll_reqs (unsigned int maxreqs) +etp_set_max_poll_reqs (etp_pool pool, unsigned int maxreqs) { if (WORDACCESS_UNSAFE) X_LOCK (reslock); max_poll_reqs = maxreqs; @@ -478,7 +590,7 @@ } ETP_API_DECL void ecb_cold -etp_set_max_idle (unsigned int nthreads) +etp_set_max_idle (etp_pool pool, unsigned int nthreads) { if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads; @@ -486,7 +598,7 @@ } ETP_API_DECL void ecb_cold -etp_set_idle_timeout (unsigned int seconds) +etp_set_idle_timeout (etp_pool pool, unsigned int seconds) { if (WORDACCESS_UNSAFE) X_LOCK (reqlock); idle_timeout = seconds; @@ -494,19 +606,19 @@ } ETP_API_DECL void ecb_cold -etp_set_min_parallel (unsigned int nthreads) +etp_set_min_parallel (etp_pool pool, unsigned int nthreads) { if (wanted < nthreads) wanted = nthreads; } ETP_API_DECL void ecb_cold -etp_set_max_parallel (unsigned int nthreads) +etp_set_max_parallel (etp_pool pool, unsigned int nthreads) { if (wanted > nthreads) wanted = nthreads; while (started > wanted) - etp_end_thread (); + etp_end_thread (pool); }