--- IO-AIO/AIO.xs 2006/10/26 14:35:34 1.78 +++ IO-AIO/AIO.xs 2006/10/26 16:28:33 1.79 @@ -63,6 +63,18 @@ # define STACKSIZE 16384 #endif +/* wether word reads are potentially non-atomic. + * this is conservatice, likely most arches this runs + * on have atomic word read/writes. + */ +#ifndef WORDREAD_UNSAFE +# if __i386 || __x86_64 +# define WORDREAD_UNSAFE 0 +# else +# define WORDREAD_UNSAFE 1 +# endif +#endif + /* buffer size for various temporary buffers */ #define AIO_BUFSIZE 65536 @@ -135,7 +147,7 @@ static int next_pri = DEFAULT_PRI + PRI_BIAS; static unsigned int started, wanted; -static volatile unsigned int nreqs; +static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_outstanding = 0xffffffff; static int respipe [2]; @@ -328,13 +340,9 @@ while (nreqs) { int size; -#if !(__i386 || __x86_64) /* safe without sempahore on these archs */ - LOCK (reslock); -#endif + if (WORDREAD_UNSAFE) LOCK (reslock); size = res_queue.size; -#if !(__i386 || __x86_64) /* safe without sempahore on these archs */ - UNLOCK (reslock); -#endif + if (WORDREAD_UNSAFE) UNLOCK (reslock); if (size) return; @@ -352,8 +360,6 @@ if (!(req->flags & FLAG_CANCELLED) && SvOK (req->callback)) { - errno = req->errorno; - ENTER; SAVETMPS; PUSHMARK (SP); @@ -428,6 +434,7 @@ break; } + errno = req->errorno; PUTBACK; call_sv (req->callback, G_VOID | G_EVAL); @@ -515,6 +522,8 @@ if (req) { + --npending; + if (!res_queue.size) { /* read any signals sent by the worker threads */ @@ -616,6 +625,7 @@ ++nreqs; LOCK (reqlock); + ++nready; reqq_push (&req_queue, req); pthread_cond_signal (&reqwait); UNLOCK (reqlock); @@ -947,6 +957,8 @@ pthread_cond_wait (&reqwait, &reqlock); } + --nready; + UNLOCK (reqlock); errno = 0; /* strictly unnecessary */ @@ -1001,6 +1013,8 @@ LOCK (reslock); + ++npending; + if (!reqq_push (&res_queue, req)) /* write a dummy byte to the pipe so fh becomes ready */ write (respipe [1], &respipe, 1); @@ -1394,20 +1408,27 @@ REQ_SEND; } -void -aioreq_pri (int pri = DEFAULT_PRI) - CODE: - if (pri < PRI_MIN) pri = PRI_MIN; - if (pri > PRI_MAX) pri = PRI_MAX; - next_pri = pri + PRI_BIAS; +int +aioreq_pri (int pri = 0) + PROTOTYPE: ;$ + CODE: + RETVAL = next_pri - PRI_BIAS; + if (items > 0) + { + if (pri < PRI_MIN) pri = PRI_MIN; + if (pri > PRI_MAX) pri = PRI_MAX; + next_pri = pri + PRI_BIAS; + } + OUTPUT: + RETVAL void aioreq_nice (int nice = 0) - CODE: - nice = next_pri - nice; - if (nice < PRI_MIN) nice = PRI_MIN; - if (nice > PRI_MAX) nice = PRI_MAX; - next_pri = nice + PRI_BIAS; + CODE: + nice = next_pri - nice; + if (nice < PRI_MIN) nice = PRI_MIN; + if (nice > PRI_MAX) nice = PRI_MAX; + next_pri = nice + PRI_BIAS; void flush () @@ -1468,6 +1489,26 @@ OUTPUT: RETVAL +int +nready() + PROTOTYPE: + CODE: + if (WORDREAD_UNSAFE) LOCK (reqlock); + RETVAL = nready; + if (WORDREAD_UNSAFE) UNLOCK (reqlock); + OUTPUT: + RETVAL + +int +npending() + PROTOTYPE: + CODE: + if (WORDREAD_UNSAFE) LOCK (reslock); + RETVAL = npending; + if (WORDREAD_UNSAFE) UNLOCK (reslock); + OUTPUT: + RETVAL + PROTOTYPES: DISABLE MODULE = IO::AIO PACKAGE = IO::AIO::REQ @@ -1528,7 +1569,11 @@ CODE: { int i; - AV *av = newAV (); + AV *av; + + grp->errorno = errno; + + av = newAV (); for (i = 1; i < items; ++i ) av_push (av, newSVsv (ST (i))); @@ -1538,6 +1583,11 @@ } void +errno (aio_req grp, int errorno = errno) + CODE: + grp->errorno = errorno; + +void limit (aio_req grp, int limit) CODE: grp->fd2 = limit;