--- libeio/eio.c 2008/05/13 19:34:11 1.13 +++ libeio/eio.c 2008/05/17 12:17:25 1.14 @@ -1,3 +1,42 @@ +/* + * libeio implementation + * + * Copyright (c) 2007,2008 Marc Alexander Lehmann + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without modifica- + * tion, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER- + * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE- + * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; + * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, + * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH- + * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + * + * Alternatively, the contents of this file may be used under the terms of + * the GNU General Public License ("GPL") version 2 or any later version, + * in which case the provisions of the GPL are applicable instead of + * the above. If you wish to allow the use of your version of this file + * only under the terms of the GPL and not to allow others to use your + * version of this file under the BSD license, indicate your decision + * by deleting the provisions above and replace them with the notice + * and other provisions required by the GPL. If you do not delete the + * provisions above, a recipient may use your version of this file under + * either the BSD or the GPL. + */ + #include "eio.h" #include "xthread.h" @@ -72,9 +111,9 @@ #define dBUF \ char *eio_buf; \ - X_LOCK (etplock); \ + ETP_WORKER_LOCK (self); \ self->dbuf = eio_buf = malloc (EIO_BUFSIZE); \ - X_UNLOCK (etplock); \ + ETP_WORKER_UNLOCK (self); \ errno = ENOMEM; \ if (!eio_buf) \ return -1; @@ -83,6 +122,34 @@ /*****************************************************************************/ +#define ETP_PRI_MIN EIO_PRI_MIN +#define ETP_PRI_MAX EIO_PRI_MAX + +#define ETP_REQ eio_req +#define ETP_DESTROY(req) eio_destroy (req) +static int eio_finish (eio_req *req); +#define ETP_FINISH(req) eio_finish (req) + +#define ETP_WORKER_CLEAR(req) \ + if (wrk->dbuf) \ + { \ + free (wrk->dbuf); \ + wrk->dbuf = 0; \ + } \ + \ + if (wrk->dirp) \ + { \ + closedir (wrk->dirp); \ + wrk->dirp = 0; \ + } +#define ETP_WORKER_COMMON \ + void *dbuf; \ + DIR *dirp; + +/*****************************************************************************/ + +#define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) + /* calculcate time difference in ~1/EIO_TICKS of a second */ static int tvdiff (struct timeval *tv1, struct timeval *tv2) { @@ -92,56 +159,48 @@ static unsigned int started, idle, wanted = 4; -typedef struct etp_pool -{ - void (*want_poll_cb) (void); - void (*done_poll_cb) (void); +static void (*want_poll_cb) (void); +static void (*done_poll_cb) (void); - unsigned int max_poll_time; - unsigned int max_poll_reqs; -} etp_pool; +static unsigned int max_poll_time; /* reslock */ +static unsigned int max_poll_reqs; /* reslock */ -static volatile unsigned int nreqs, nready, npending; +static volatile unsigned int nreqs; /* reqlock */ +static volatile unsigned int nready; /* reqlock */ +static volatile unsigned int npending; /* reqlock */ static volatile unsigned int max_idle = 4; -static mutex_t etplock = X_MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; static mutex_t reslock = X_MUTEX_INIT; static mutex_t reqlock = X_MUTEX_INIT; static cond_t reqwait = X_COND_INIT; -typedef struct worker +typedef struct etp_worker { - /* locked by etplock */ - struct worker *prev, *next; + /* locked by wrklock */ + struct etp_worker *prev, *next; thread_t tid; - /* locked by reslock, reqlock or etplock */ - eio_req *req; /* currently processed request */ - void *dbuf; - DIR *dirp; -} worker; + /* locked by reslock, reqlock or wrklock */ + ETP_REQ *req; /* currently processed request */ -static worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */ + ETP_WORKER_COMMON +} etp_worker; + +static etp_worker wrk_first = { &wrk_first, &wrk_first, 0 }; /* NOT etp */ + +#define ETP_WORKER_LOCK(wrk) X_LOCK (wrklock) +#define ETP_WORKER_UNLOCK(wrk) X_UNLOCK (wrklock) /* worker threads management */ -static void worker_clear (worker *wrk) +static void etp_worker_clear (etp_worker *wrk) { - if (wrk->dirp) - { - closedir (wrk->dirp); - wrk->dirp = 0; - } - - if (wrk->dbuf) - { - free (wrk->dbuf); - wrk->dbuf = 0; - } + ETP_WORKER_CLEAR (wrk); } -static void worker_free (worker *wrk) +static void etp_worker_free (etp_worker *wrk) { wrk->next->prev = wrk->prev; wrk->prev->next = wrk->next; @@ -149,12 +208,16 @@ free (wrk); } -unsigned int eio_nreqs (void) +static unsigned int etp_nreqs (void) { - return nreqs; + int retval; + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); + retval = nreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + return retval; } -unsigned int eio_nready (void) +static unsigned int etp_nready (void) { unsigned int retval; @@ -165,7 +228,7 @@ return retval; } -unsigned int eio_npending (void) +static unsigned int etp_npending (void) { unsigned int retval; @@ -176,7 +239,7 @@ return retval; } -unsigned int eio_nthreads (void) +static unsigned int etp_nthreads (void) { unsigned int retval; @@ -193,14 +256,14 @@ * per shift, the most expensive operation. */ typedef struct { - eio_req *qs[EIO_NUM_PRI], *qe[EIO_NUM_PRI]; /* qstart, qend */ + ETP_REQ *qs[ETP_NUM_PRI], *qe[ETP_NUM_PRI]; /* qstart, qend */ int size; -} reqq; +} etp_reqq; -static reqq req_queue; -static reqq res_queue; +static etp_reqq req_queue; +static etp_reqq res_queue; -static int reqq_push (reqq *q, eio_req *req) +static int reqq_push (etp_reqq *q, ETP_REQ *req) { int pri = req->pri; req->next = 0; @@ -216,7 +279,7 @@ return q->size++; } -static eio_req *reqq_shift (reqq *q) +static ETP_REQ *reqq_shift (etp_reqq *q) { int pri; @@ -225,7 +288,7 @@ --q->size; - for (pri = EIO_NUM_PRI; pri--; ) + for (pri = ETP_NUM_PRI; pri--; ) { eio_req *req = q->qs[pri]; @@ -243,7 +306,7 @@ static void etp_atfork_prepare (void) { - X_LOCK (etplock); + X_LOCK (wrklock); X_LOCK (reqlock); X_LOCK (reslock); #if !HAVE_PREADWRITE @@ -264,28 +327,28 @@ #endif X_UNLOCK (reslock); X_UNLOCK (reqlock); - X_UNLOCK (etplock); + X_UNLOCK (wrklock); } static void etp_atfork_child (void) { - eio_req *prv; + ETP_REQ *prv; while (prv = reqq_shift (&req_queue)) - eio_destroy (prv); + ETP_DESTROY (prv); while (prv = reqq_shift (&res_queue)) - eio_destroy (prv); + ETP_DESTROY (prv); while (wrk_first.next != &wrk_first) { - worker *wrk = wrk_first.next; + etp_worker *wrk = wrk_first.next; if (wrk->req) - eio_destroy (wrk->req); + ETP_DESTROY (wrk->req); - worker_clear (wrk); - worker_free (wrk); + etp_worker_clear (wrk); + etp_worker_free (wrk); } started = 0; @@ -304,19 +367,199 @@ } static int -etp_init (etp_pool *etp, void (*want_poll)(void), void (*done_poll)(void)) +etp_init (void (*want_poll)(void), void (*done_poll)(void)) { static pthread_once_t doinit = PTHREAD_ONCE_INIT; pthread_once (&doinit, etp_once_init); - memset (etp, 0, sizeof *etp); + want_poll_cb = want_poll; + done_poll_cb = done_poll; +} + +X_THREAD_PROC (etp_proc); + +static void etp_start_thread (void) +{ + etp_worker *wrk = calloc (1, sizeof (etp_worker)); + + /*TODO*/ + assert (("unable to allocate worker thread data", wrk)); + + X_LOCK (wrklock); + + if (thread_create (&wrk->tid, etp_proc, (void *)wrk)) + { + wrk->prev = &wrk_first; + wrk->next = wrk_first.next; + wrk_first.next->prev = wrk; + wrk_first.next = wrk; + ++started; + } + else + free (wrk); + + X_UNLOCK (wrklock); +} + +static void etp_maybe_start_thread (void) +{ + if (etp_nthreads () >= wanted) + return; + + /* todo: maybe use idle here, but might be less exact */ + if (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ()) + return; + + etp_start_thread (); +} + +static void etp_end_thread (void) +{ + eio_req *req = calloc (1, sizeof (eio_req)); + + req->type = -1; + req->pri = ETP_PRI_MAX - ETP_PRI_MIN; + + X_LOCK (reqlock); + reqq_push (&req_queue, req); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); + + X_LOCK (wrklock); + --started; + X_UNLOCK (wrklock); +} + +static int etp_poll (void) +{ + unsigned int maxreqs; + unsigned int maxtime; + struct timeval tv_start, tv_now; + + X_LOCK (reslock); + maxreqs = max_poll_reqs; + maxtime = max_poll_time; + X_UNLOCK (reslock); + + if (maxtime) + gettimeofday (&tv_start, 0); + + for (;;) + { + ETP_REQ *req; + + etp_maybe_start_thread (); + + X_LOCK (reslock); + req = reqq_shift (&res_queue); + + if (req) + { + --npending; + + if (!res_queue.size && done_poll_cb) + done_poll_cb (); + } + + X_UNLOCK (reslock); + + if (!req) + return 0; + + X_LOCK (reqlock); + --nreqs; + X_UNLOCK (reqlock); + + if (req->type == EIO_GROUP && req->size) + { + req->int1 = 1; /* mark request as delayed */ + continue; + } + else + { + int res = ETP_FINISH (req); + if (res) + return res; + } + + if (maxreqs && !--maxreqs) + break; + + if (maxtime) + { + gettimeofday (&tv_now, 0); + + if (tvdiff (&tv_start, &tv_now) >= maxtime) + break; + } + } + + errno = EAGAIN; + return -1; +} + +static void etp_cancel (ETP_REQ *req) +{ + X_LOCK (wrklock); + req->flags |= EIO_FLAG_CANCELLED; + X_UNLOCK (wrklock); + + eio_grp_cancel (req); +} + +static void etp_submit (ETP_REQ *req) +{ + req->pri -= ETP_PRI_MIN; + + if (req->pri < ETP_PRI_MIN - ETP_PRI_MIN) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; + if (req->pri > ETP_PRI_MAX - ETP_PRI_MIN) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; + + X_LOCK (reqlock); + ++nreqs; + ++nready; + reqq_push (&req_queue, req); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); + + etp_maybe_start_thread (); +} + +static void etp_set_max_poll_time (double nseconds) +{ + if (WORDACCESS_UNSAFE) X_LOCK (reslock); + max_poll_time = nseconds; + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); +} + +static void etp_set_max_poll_reqs (unsigned int maxreqs) +{ + if (WORDACCESS_UNSAFE) X_LOCK (reslock); + max_poll_reqs = maxreqs; + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); +} - etp->want_poll_cb = want_poll; - etp->done_poll_cb = done_poll; +static void etp_set_max_idle (unsigned int nthreads) +{ + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); + max_idle = nthreads <= 0 ? 1 : nthreads; + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } -static etp_pool etp; +static void etp_set_min_parallel (unsigned int nthreads) +{ + if (wanted < nthreads) + wanted = nthreads; +} + +static void etp_set_max_parallel (unsigned int nthreads) +{ + if (wanted > nthreads) + wanted = nthreads; + + while (started > wanted) + etp_end_thread (); +} /*****************************************************************************/ @@ -337,8 +580,6 @@ } } -static int eio_finish (eio_req *req); - static int grp_dec (eio_req *grp) { --grp->size; @@ -396,178 +637,62 @@ void eio_cancel (eio_req *req) { - X_LOCK (etplock); - req->flags |= EIO_FLAG_CANCELLED; - X_UNLOCK (etplock); - - eio_grp_cancel (req); + etp_cancel (req); } -X_THREAD_PROC (eio_proc); - -static void start_thread (void) +void eio_submit (eio_req *req) { - worker *wrk = calloc (1, sizeof (worker)); - - /*TODO*/ - assert (("unable to allocate worker thread data", wrk)); - - X_LOCK (etplock); - - if (thread_create (&wrk->tid, eio_proc, (void *)wrk)) - { - wrk->prev = &wrk_first; - wrk->next = wrk_first.next; - wrk_first.next->prev = wrk; - wrk_first.next = wrk; - ++started; - } - else - free (wrk); - - X_UNLOCK (etplock); + etp_submit (req); } -static void maybe_start_thread (void) +unsigned int eio_nreqs (void) { - if (eio_nthreads () >= wanted) - return; - - /* todo: maybe use idle here, but might be less exact */ - if (0 <= (int)eio_nthreads () + (int)eio_npending () - (int)eio_nreqs ()) - return; - - start_thread (); + return etp_nreqs (); } -void eio_submit (eio_req *req) +unsigned int eio_nready (void) { - req->pri += EIO_PRI_BIAS; - - if (req->pri < EIO_PRI_MIN + EIO_PRI_BIAS) req->pri = EIO_PRI_MIN + EIO_PRI_BIAS; - if (req->pri > EIO_PRI_MAX + EIO_PRI_BIAS) req->pri = EIO_PRI_MAX + EIO_PRI_BIAS; - - ++nreqs; - - X_LOCK (reqlock); - ++nready; - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); - - maybe_start_thread (); + return etp_nready (); } -static void end_thread (void) +unsigned int eio_npending (void) { - eio_req *req = calloc (1, sizeof (eio_req)); - - req->type = EIO_QUIT; - req->pri = EIO_PRI_MAX + EIO_PRI_BIAS; - - X_LOCK (reqlock); - reqq_push (&req_queue, req); - X_COND_SIGNAL (reqwait); - X_UNLOCK (reqlock); + return etp_npending (); +} - X_LOCK (etplock); - --started; - X_UNLOCK (etplock); +unsigned int eio_nthreads (void) +{ + return etp_nthreads (); } void eio_set_max_poll_time (double nseconds) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - etp.max_poll_time = nseconds; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + etp_set_max_poll_time (nseconds); } void eio_set_max_poll_reqs (unsigned int maxreqs) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - etp.max_poll_reqs = maxreqs; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + etp_set_max_poll_reqs (maxreqs); } void eio_set_max_idle (unsigned int nthreads) { - if (WORDACCESS_UNSAFE) X_LOCK (reqlock); - max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); + etp_set_max_idle (nthreads); } void eio_set_min_parallel (unsigned int nthreads) { - if (wanted < nthreads) - wanted = nthreads; + etp_set_min_parallel (nthreads); } void eio_set_max_parallel (unsigned int nthreads) { - if (wanted > nthreads) - wanted = nthreads; - - while (started > wanted) - end_thread (); + etp_set_max_parallel (nthreads); } int eio_poll (void) { - int maxreqs = etp.max_poll_reqs; - struct timeval tv_start, tv_now; - eio_req *req; - - if (etp.max_poll_time) - gettimeofday (&tv_start, 0); - - for (;;) - { - maybe_start_thread (); - - X_LOCK (reslock); - req = reqq_shift (&res_queue); - - if (req) - { - --npending; - - if (!res_queue.size && etp.done_poll_cb) - etp.done_poll_cb (); - } - - X_UNLOCK (reslock); - - if (!req) - return 0; - - --nreqs; - - if (req->type == EIO_GROUP && req->size) - { - req->int1 = 1; /* mark request as delayed */ - continue; - } - else - { - int res = eio_finish (req); - if (res) - return res; - } - - if (maxreqs && !--maxreqs) - break; - - if (etp.max_poll_time) - { - gettimeofday (&tv_now, 0); - - if (tvdiff (&tv_start, &tv_now) >= etp.max_poll_time) - break; - } - } - - errno = EAGAIN; - return -1; + return etp_poll (); } /*****************************************************************************/ @@ -707,7 +832,7 @@ /* sendfile always needs emulation */ static ssize_t -eio__sendfile (int ofd, int ifd, off_t offset, size_t count, worker *self) +eio__sendfile (int ofd, int ifd, off_t offset, size_t count, etp_worker *self) { ssize_t res; @@ -801,7 +926,7 @@ /* read a full directory */ static void -eio__scandir (eio_req *req, worker *self) +eio__scandir (eio_req *req, etp_worker *self) { DIR *dirp; union @@ -815,12 +940,12 @@ int memofs = 0; int res = 0; - X_LOCK (etplock); + X_LOCK (wrklock); self->dirp = dirp = opendir (req->ptr1); self->dbuf = u = malloc (sizeof (*u)); req->flags |= EIO_FLAG_PTR2_FREE; req->ptr2 = names = malloc (memlen); - X_UNLOCK (etplock); + X_UNLOCK (wrklock); if (dirp && u && names) for (;;) @@ -842,9 +967,9 @@ while (memofs + len > memlen) { memlen *= 2; - X_LOCK (etplock); + X_LOCK (wrklock); req->ptr2 = names = realloc (names, memlen); - X_UNLOCK (etplock); + X_UNLOCK (wrklock); if (!names) break; @@ -866,9 +991,9 @@ #define ALLOC(len) \ if (!req->ptr2) \ { \ - X_LOCK (etplock); \ + X_LOCK (wrklock); \ req->flags |= EIO_FLAG_PTR2_FREE; \ - X_UNLOCK (etplock); \ + X_UNLOCK (wrklock); \ req->ptr2 = malloc (len); \ if (!req->ptr2) \ { \ @@ -878,19 +1003,17 @@ } \ } -X_THREAD_PROC (eio_proc) +X_THREAD_PROC (etp_proc) { - eio_req *req; + ETP_REQ *req; struct timespec ts; - worker *self = (worker *)thr_arg; + etp_worker *self = (etp_worker *)thr_arg; /* try to distribute timeouts somewhat randomly */ ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { - ts.tv_sec = time (0) + IDLE_TIMEOUT; - X_LOCK (reqlock); for (;;) @@ -902,21 +1025,21 @@ ++idle; + ts.tv_sec = time (0) + IDLE_TIMEOUT; if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; X_UNLOCK (reqlock); - X_LOCK (etplock); + X_LOCK (wrklock); --started; - X_UNLOCK (etplock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ X_COND_WAIT (reqwait, reqlock); - ts.tv_sec = time (0) + IDLE_TIMEOUT; } --idle; @@ -1020,7 +1143,7 @@ req->result = 0; break; - case EIO_QUIT: + case -1: goto quit; default: @@ -1034,19 +1157,19 @@ ++npending; - if (!reqq_push (&res_queue, req) && etp.want_poll_cb) - etp.want_poll_cb (); + if (!reqq_push (&res_queue, req) && want_poll_cb) + want_poll_cb (); self->req = 0; - worker_clear (self); + etp_worker_clear (self); X_UNLOCK (reslock); } quit: - X_LOCK (etplock); - worker_free (self); - X_UNLOCK (etplock); + X_LOCK (wrklock); + etp_worker_free (self); + X_UNLOCK (wrklock); return 0; } @@ -1055,7 +1178,7 @@ int eio_init (void (*want_poll)(void), void (*done_poll)(void)) { - etp_init (&etp, want_poll, done_poll); + etp_init (want_poll, done_poll); } static void eio_api_destroy (eio_req *req) @@ -1323,7 +1446,7 @@ ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count) { - worker wrk; + etp_worker wrk; wrk.dbuf = 0;