--- IO-AIO/AIO.xs 2006/10/28 01:24:19 1.84 +++ IO-AIO/AIO.xs 2006/10/28 23:32:29 1.85 @@ -70,11 +70,11 @@ * this is conservatice, likely most arches this runs * on have atomic word read/writes. */ -#ifndef WORDREAD_UNSAFE +#ifndef WORDACCESS_UNSAFE # if __i386 || __x86_64 -# define WORDREAD_UNSAFE 0 +# define WORDACCESS_UNSAFE 0 # else -# define WORDREAD_UNSAFE 1 +# define WORDACCESS_UNSAFE 1 # endif #endif @@ -147,6 +147,18 @@ NUM_PRI = PRI_MAX + PRI_BIAS + 1, }; +#define AIO_TICKS ((1000000 + 1023) >> 10) + +static unsigned int max_poll_time = 0; +static unsigned int max_poll_reqs = 0; + +/* calculcate time difference in ~1/AIO_TICKS of a second */ +static int tvdiff (struct timeval *tv1, struct timeval *tv2) +{ + return (tv2->tv_sec - tv1->tv_sec ) * AIO_TICKS + + ((tv2->tv_usec - tv1->tv_usec) >> 10); +} + static int next_pri = DEFAULT_PRI + PRI_BIAS; static unsigned int started, idle, wanted; @@ -209,7 +221,7 @@ static pthread_mutex_t reqlock = AIO_MUTEX_INIT; static pthread_cond_t reqwait = PTHREAD_COND_INITIALIZER; -#if WORDREAD_UNSAFE +#if WORDACCESS_UNSAFE static unsigned int get_nready () { @@ -233,10 +245,22 @@ return retval; } +static unsigned int get_nthreads () +{ + unsigned int retval; + + LOCK (wrklock); + retval = started; + UNLOCK (wrklock); + + return retval; +} + #else # define get_nready() nready # define get_npending() npending +# define get_nthreads() started #endif @@ -294,7 +318,7 @@ abort (); } -static int poll_cb (int max); +static int poll_cb (); static void req_invoke (aio_req req); static void req_free (aio_req req); static void req_cancel (aio_req req); @@ -562,11 +586,11 @@ static void maybe_start_thread () { - if (started >= wanted) + if (get_nthreads () >= wanted) return; /* todo: maybe use idle here, but might be less exact */ - if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) + if (0 <= (int)get_nthreads () + (int)get_npending () - (int)nreqs) return; start_thread (); @@ -604,6 +628,13 @@ UNLOCK (wrklock); } +static void set_max_idle (int nthreads) +{ + if (WORDACCESS_UNSAFE) LOCK (reqlock); + max_idle = nthreads <= 0 ? 1 : nthreads; + if (WORDACCESS_UNSAFE) UNLOCK (reqlock); +} + static void min_parallel (int nthreads) { if (wanted < nthreads) @@ -626,9 +657,9 @@ while (nreqs) { int size; - if (WORDREAD_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) LOCK (reslock); size = res_queue.size; - if (WORDREAD_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) UNLOCK (reslock); if (size) return; @@ -642,16 +673,21 @@ } } -static int poll_cb (int max) +static int poll_cb () { dSP; int count = 0; + int maxreqs = max_poll_reqs; int do_croak = 0; + struct timeval tv_start, tv_now; aio_req req; + if (max_poll_time) + gettimeofday (&tv_start, 0); + for (;;) { - while (max <= 0 || count < max) + for (;;) { maybe_start_thread (); @@ -704,6 +740,17 @@ } req_free (req); + + if (maxreqs && !--maxreqs) + break; + + if (max_poll_time) + { + gettimeofday (&tv_now, 0); + + if (tvdiff (&tv_start, &tv_now) >= max_poll_time) + break; + } } if (nreqs <= max_outstanding) @@ -711,7 +758,7 @@ poll_wait (); - max = 0; + ++maxreqs; } return count; @@ -1212,6 +1259,18 @@ } void +max_poll_reqs (int nreqs) + PROTOTYPE: $ + CODE: + max_poll_reqs = nreqs; + +void +max_poll_time (double nseconds) + PROTOTYPE: $ + CODE: + max_poll_time = nseconds * AIO_TICKS; + +void min_parallel (int nthreads) PROTOTYPE: $ @@ -1219,6 +1278,12 @@ max_parallel (int nthreads) PROTOTYPE: $ +void +max_idle (int nthreads) + PROTOTYPE: $ + CODE: + set_max_idle (nthreads); + int max_outstanding (int maxreqs) PROTOTYPE: $ @@ -1564,15 +1629,7 @@ poll_cb(...) PROTOTYPE: CODE: - RETVAL = poll_cb (0); - OUTPUT: - RETVAL - -int -poll_some(int max = 0) - PROTOTYPE: $ - CODE: - RETVAL = poll_cb (max); + RETVAL = poll_cb (); OUTPUT: RETVAL @@ -1607,6 +1664,16 @@ OUTPUT: RETVAL +int +nthreads() + PROTOTYPE: + CODE: + if (WORDACCESS_UNSAFE) LOCK (wrklock); + RETVAL = started; + if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + OUTPUT: + RETVAL + PROTOTYPES: DISABLE MODULE = IO::AIO PACKAGE = IO::AIO::REQ