--- IO-AIO/AIO.xs 2006/10/27 19:17:23 1.80 +++ IO-AIO/AIO.xs 2006/10/28 01:24:19 1.84 @@ -45,6 +45,9 @@ # endif #endif +/* number of seconds after which idle threads exit */ +#define IDLE_TIMEOUT 10 + /* used for struct dirent, AIX doesn't provide it */ #ifndef NAME_MAX # define NAME_MAX 4096 @@ -94,7 +97,7 @@ REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_FSYNC, REQ_FDATASYNC, REQ_UNLINK, REQ_RMDIR, REQ_RENAME, - REQ_READDIR, + REQ_MKNOD, REQ_READDIR, REQ_LINK, REQ_SYMLINK, REQ_GROUP, REQ_NOP, REQ_BUSY, @@ -146,7 +149,7 @@ static int next_pri = DEFAULT_PRI + PRI_BIAS; -static unsigned int started, wanted; +static unsigned int started, idle, wanted; #if __linux && defined (PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) # define AIO_MUTEX_INIT PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP @@ -198,6 +201,7 @@ } static volatile unsigned int nreqs, nready, npending; +static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; static int respipe [2]; @@ -558,35 +562,13 @@ static void maybe_start_thread () { -#if 0 - static struct timeval last; - struct timeval diff, now; -#endif - if (started >= wanted) return; - if (nready <= nreqs - get_nready () - get_npending ()) + /* todo: maybe use idle here, but might be less exact */ + if ((int)nready <= (int)started - (int)(nreqs - get_nready () - get_npending ())) return; -#if 0 - gettimeofday (&now, 0); - - diff.tv_sec = now.tv_sec - last.tv_sec; - diff.tv_usec = now.tv_usec - last.tv_usec; - - if (diff.tv_usec < 0) - { - --diff.tv_sec; - diff.tv_usec += 1000000; - } - - if (!diff.tv_sec && diff.tv_usec < 10000) - return; - - last = now; -#endif - start_thread (); } @@ -612,7 +594,10 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - req_send (req); + LOCK (reqlock); + reqq_push (&req_queue, req); + pthread_cond_signal (&reqwait); + UNLOCK (reqlock); LOCK (wrklock); --started; @@ -1003,10 +988,17 @@ static void *aio_proc (void *thr_arg) { aio_req req; + struct timespec ts; worker *self = (worker *)thr_arg; + /* try to distribute timeouts somewhat evenly */ + ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) + * (1000000000UL / 1024UL); + for (;;) { + ts.tv_sec = time (0) + IDLE_TIMEOUT; + LOCK (reqlock); for (;;) @@ -1016,7 +1008,27 @@ if (req) break; - pthread_cond_wait (&reqwait, &reqlock); + ++idle; + + if (pthread_cond_timedwait (&reqwait, &reqlock, &ts) + == ETIMEDOUT) + { + if (idle > max_idle) + { + --idle; + UNLOCK (reqlock); + LOCK (wrklock); + --started; + UNLOCK (wrklock); + goto quit; + } + + /* we are allowed to idle, so do so without any timeout */ + pthread_cond_wait (&reqwait, &reqlock); + ts.tv_sec = time (0) + IDLE_TIMEOUT; + } + + --idle; } --nready; @@ -1045,6 +1057,7 @@ case REQ_RENAME: req->result = rename (req->data2ptr, req->dataptr); break; case REQ_LINK: req->result = link (req->data2ptr, req->dataptr); break; case REQ_SYMLINK: req->result = symlink (req->data2ptr, req->dataptr); break; + case REQ_MKNOD: req->result = mknod (req->data2ptr, req->mode, (dev_t)req->offset); break; case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; case REQ_FSYNC: req->result = fsync (req->fd); break; @@ -1065,11 +1078,7 @@ break; case REQ_QUIT: - LOCK (wrklock); - worker_free (self); - --started; - UNLOCK (wrklock); - return 0; + goto quit; default: req->result = ENOSYS; @@ -1091,6 +1100,13 @@ UNLOCK (reslock); } + +quit: + LOCK (wrklock); + worker_free (self); + UNLOCK (wrklock); + + return 0; } /*****************************************************************************/ @@ -1142,8 +1158,11 @@ worker_free (wrk); } - started = 0; - nreqs = 0; + started = 0; + idle = 0; + nreqs = 0; + nready = 0; + npending = 0; close (respipe [0]); close (respipe [1]); @@ -1180,14 +1199,16 @@ BOOT: { HV *stash = gv_stashpv ("IO::AIO", 1); + newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); + newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); + newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); + newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); create_pipe (); pthread_atfork (atfork_prepare, atfork_parent, atfork_child); - - start_thread (); } void @@ -1428,6 +1449,25 @@ REQ_SEND; } + +void +aio_mknod (pathname,mode,dev,callback=&PL_sv_undef) + SV * pathname + SV * callback + UV mode + UV dev + PPCODE: +{ + dREQ; + + req->type = REQ_MKNOD; + req->data = newSVsv (pathname); + req->dataptr = SvPVbyte_nolen (req->data); + req->mode = (mode_t)mode; + req->offset = dev; + + REQ_SEND; +} void aio_busy (delay,callback=&PL_sv_undef)