--- IO-AIO/AIO.xs 2006/10/28 00:17:30 1.83 +++ IO-AIO/AIO.xs 2006/10/28 23:32:29 1.85 @@ -45,6 +45,9 @@ # endif #endif +/* number of seconds after which idle threads exit */ +#define IDLE_TIMEOUT 10 + /* used for struct dirent, AIX doesn't provide it */ #ifndef NAME_MAX # define NAME_MAX 4096 @@ -67,11 +70,11 @@ * this is conservatice, likely most arches this runs * on have atomic word read/writes. */ -#ifndef WORDREAD_UNSAFE +#ifndef WORDACCESS_UNSAFE # if __i386 || __x86_64 -# define WORDREAD_UNSAFE 0 +# define WORDACCESS_UNSAFE 0 # else -# define WORDREAD_UNSAFE 1 +# define WORDACCESS_UNSAFE 1 # endif #endif @@ -144,9 +147,21 @@ NUM_PRI = PRI_MAX + PRI_BIAS + 1, }; +#define AIO_TICKS ((1000000 + 1023) >> 10) + +static unsigned int max_poll_time = 0; +static unsigned int max_poll_reqs = 0; + +/* calculcate time difference in ~1/AIO_TICKS of a second */ +static int tvdiff (struct timeval *tv1, struct timeval *tv2) +{ + return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS + + ((tv2->tv_usec - tv1->tv_usec) >> 10); +} + static int next_pri = DEFAULT_PRI + PRI_BIAS; -static unsigned int started, wanted; +static unsigned int started, idle, wanted; #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP @@ -198,6 +213,7 @@ } static volatile unsigned int nreqs, nready, npending; +static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; static int respipe [2]; @@ -205,7 +221,7 @@ static pthread_mutex_t reqlock = AIO_MUTEX_INIT; static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; -#if WORDREAD_UNSAFE +#if WORDACCESS_UNSAFE static unsigned int get_nready () { @@ -229,10 +245,22 @@ return retval; } +static unsigned int get_nthreads () +{ + unsigned int retval; + + LOCK (wrklock); + retval = started; + UNLOCK (wrklock); + + return retval; +} + #else # define get_nready() nready # define get_npending() npending +# define get_nthreads() started #endif @@ -290,7 +318,7 @@ abort (); } -static int poll_cb (int max); +static int poll_cb (); static void req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -558,35 +586,13 @@ static void maybe_start_thread () { -#if 0 - static struct timeval last; - struct timeval diff, now; -#endif - - if (started >= wanted) + if (get_nthreads () >= wanted) return; - if (nready <= nreqs - get_nready () - get_npending ()) + /* todo: maybe use idle here, but might be less exact */ + if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs) return; -#if 0 - gettimeofday (&now, 0); - - diff.tv_sec = now.tv_sec - last.tv_sec; - diff.tv_usec = now.tv_usec - last.tv_usec; - - if (diff.tv_usec < 0) - { - --diff.tv_sec; - diff.tv_usec += 1000000; - } - - if (!diff.tv_sec && diff.tv_usec < 10000) - return; - - last = now; -#endif - start_thread (); } @@ -622,6 +628,13 @@ UNLOCK (wrklock); } +static void set_max_idle (int nthreads) +{ + if (WORDACCESS_UNSAFE) LOCK (reqlock); + max_idle = nthreads <= 0 ? 1 : nthreads; + if (WORDACCESS_UNSAFE) UNLOCK (reqlock); +} + static void min_parallel (int nthreads) { if (wanted < nthreads) @@ -644,9 +657,9 @@ while (nreqs) { int size; - if (WORDREAD_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) LOCK (reslock); size = res_queue.size; - if (WORDREAD_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) UNLOCK (reslock); if (size) return; @@ -660,16 +673,21 @@ } } -static int poll_cb (int max) +static int poll_cb () { dSP; int count = 0; + int maxreqs = max_poll_reqs; int do_croak = 0; + struct timeval tv_start, tv_now; aio_req req; + if (max_poll_time) + gettimeofday (&tv_start, 0); + for (;;) { - while (max <= 0 || count < max) + for (;;) { maybe_start_thread (); @@ -722,6 +740,17 @@ } req_free (req); + + if (maxreqs && !--maxreqs) + break; + + if (max_poll_time) + { + gettimeofday (&tv_now, 0); + + if (tvdiff (&tv_start, &tv_now) >= max_poll_time) + break; + } } if (nreqs <= max_outstanding) @@ -729,7 +758,7 @@ poll_wait (); - max = 0; + ++maxreqs; } return count; @@ -1006,10 +1035,17 @@ static void *aio_proc (void *thr_arg) { aio_req req; + struct timespec ts; worker *self = (worker *)thr_arg; + /* try to distribute timeouts somewhat evenly */ + ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) + * (1000000000UL / 1024UL); + for (;;) { + ts.tv_sec = time (0) + IDLE_TIMEOUT; + LOCK (reqlock); for (;;) @@ -1019,7 +1055,27 @@ if (req) break; - pthread_cond_wait (&reqwait, &reqlock); + ++idle; + + if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) + == ETIMEDOUT) + { + if (idle > max_idle) + { + --idle; + UNLOCK (reqlock); + LOCK (wrklock); + --started; + UNLOCK (wrklock); + goto quit; + } + + /* we are allowed to idle, so do so without any timeout */ + pthread_cond_wait (&reqwait, &reqlock); + ts.tv_sec = time (0) + IDLE_TIMEOUT; + } + + --idle; } --nready; @@ -1069,11 +1125,7 @@ break; case REQ_QUIT: - LOCK (wrklock); - worker_free (self); - --started; - UNLOCK (wrklock); - return 0; + goto quit; default: req->result = ENOSYS; @@ -1095,6 +1147,13 @@ UNLOCK (reslock); } + +quit: + LOCK (wrklock); + worker_free (self); + UNLOCK (wrklock); + + return 0; } /*****************************************************************************/ @@ -1146,8 +1205,11 @@ worker_free (wrk); } - started = 0; - nreqs = 0; + started = 0; + idle = 0; + nreqs = 0; + nready = 0; + npending = 0; close (respipe [0]); close (respipe [1]); @@ -1194,11 +1256,21 @@ create_pipe (); pthread_atfork (atfork_prepare, atfork_parent, atfork_child); - - start_thread (); } void +max_poll_reqs (int nreqs) + PROTOTYPE: $ + CODE: + max_poll_reqs = nreqs; + +void +max_poll_time (double nseconds) + PROTOTYPE: $ + CODE: + max_poll_time = nseconds * AIO_TICKS; + +void min_parallel (int nthreads) PROTOTYPE: $ @@ -1206,6 +1278,12 @@ max_parallel (int nthreads) PROTOTYPE: $ +void +max_idle (int nthreads) + PROTOTYPE: $ + CODE: + set_max_idle (nthreads); + int max_outstanding (int maxreqs) PROTOTYPE: $ @@ -1551,15 +1629,7 @@ poll_cb(...) PROTOTYPE: CODE: - RETVAL = poll_cb (0); - OUTPUT: - RETVAL - -int -poll_some(int max = 0) - PROTOTYPE: $ - CODE: - RETVAL = poll_cb (max); + RETVAL = poll_cb (); OUTPUT: RETVAL @@ -1594,6 +1664,16 @@ OUTPUT: RETVAL +int +nthreads() + PROTOTYPE: + CODE: + if (WORDACCESS_UNSAFE) LOCK (wrklock); + RETVAL = started; + if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + OUTPUT: + RETVAL + PROTOTYPES: DISABLE MODULE = IO::AIO PACKAGE = IO::AIO::REQ