--- IO-AIO/AIO.xs 2007/06/03 09:44:17 1.102 +++ IO-AIO/AIO.xs 2007/07/08 09:09:34 1.103 @@ -6,21 +6,63 @@ #include "perl.h" #include "XSUB.h" -#include "autoconf/config.h" - #include #include #include -#include -#include #include #include #include -#include #include -#include #include -#include + +#ifdef _WIN32 + +# define SIGIO 0 + typedef Direntry_t X_DIRENT; +#undef malloc +#undef free + +// perl overrides all those nice win32 functions +# undef open +# undef read +# undef write +# undef stat +# undef fstat +# define lstat stat +# undef truncate +# undef ftruncate +# undef open +# undef close +# undef unlink +# undef rmdir +# undef rename +# undef lseek + +# define chown(a,b,c) (errno = ENOSYS, -1) +# define fchown(a,b,c) (errno = ENOSYS, -1) +# define fchmod(a,b) (errno = ENOSYS, -1) +# define symlink(a,b) (errno = ENOSYS, -1) +# define readlink(a,b,c) (errno = ENOSYS, -1) +# define mknod(a,b,c) (errno = ENOSYS, -1) +# define truncate(a,b) (errno = ENOSYS, -1) +# define ftruncate(fd,o) chsize ((fd), (o)) +# define fsync(fd) _commit (fd) +# define opendir(fd) (errno = ENOSYS, 0) +# define readdir(fd) (errno = ENOSYS, -1) +# define closedir(fd) (errno = ENOSYS, -1) +# define mkdir(a,b) mkdir (a) + +#else + +# include "autoconf/config.h" +# include +# include +# include +# include +# include + typedef struct dirent X_DIRENT; + +#endif #if HAVE_SENDFILE # if __linux @@ -57,9 +99,9 @@ #define dBUF \ char *aio_buf; \ - LOCK (wrklock); \ + X_LOCK (wrklock); \ self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ - UNLOCK (wrklock); \ + X_UNLOCK (wrklock); \ if (!aio_buf) \ return -1; @@ -71,6 +113,7 @@ REQ_READ, REQ_WRITE, REQ_READAHEAD, REQ_SENDFILE, REQ_STAT, REQ_LSTAT, REQ_FSTAT, + REQ_TRUNCATE, REQ_FTRUNCATE, REQ_UTIME, REQ_FUTIME, REQ_CHMOD, REQ_FCHMOD, REQ_CHOWN, REQ_FCHOWN, @@ -179,7 +222,7 @@ static unsigned int started, idle, wanted; /* worker threads management */ -static mutex_t wrklock = MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ @@ -223,9 +266,9 @@ static volatile unsigned int max_outstanding = 0xffffffff; static int respipe [2]; -static mutex_t reslock = MUTEX_INIT; -static mutex_t reqlock = MUTEX_INIT; -static cond_t reqwait = COND_INIT; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -233,9 +276,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -244,9 +287,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -255,9 +298,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -584,7 +627,7 @@ req_cancel_subs (req); } -static void *aio_proc (void *arg); +X_THREAD_PROC (aio_proc); static void start_thread (void) { @@ -593,7 +636,7 @@ if (!wrk) croak ("unable to allocate worker thread data"); - LOCK (wrklock); + X_LOCK (wrklock); if (thread_create (&wrk->tid, aio_proc, (void *)wrk)) { @@ -606,7 +649,7 @@ else free (wrk); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -627,11 +670,11 @@ ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); unblock_sig (); @@ -647,21 +690,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -686,9 +729,9 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; @@ -722,7 +765,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -738,7 +781,7 @@ } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -788,18 +831,6 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ /* work around various missing functions */ @@ -812,19 +843,19 @@ * normal read/write by using a mutex. slows down execution a lot, * but that's your problem, not mine. */ -static mutex_t preadwritelock = MUTEX_INIT; +static mutex_t preadwritelock = X_MUTEX_INIT; static ssize_t pread (int fd, void *buf, size_t count, off_t offset) { ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = read (fd, buf, count); lseek (fd, ooffset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -834,12 +865,12 @@ ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = write (fd, buf, count); lseek (fd, offset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -882,18 +913,20 @@ static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self) { + size_t todo = count; dBUF; - while (count > 0) + while (todo > 0) { - size_t len = count < AIO_BUFSIZE ? count : AIO_BUFSIZE; + size_t len = todo < AIO_BUFSIZE ? todo : AIO_BUFSIZE; pread (fd, aio_buf, len, offset); offset += len; - count -= len; + todo -= len; } errno = 0; + return count; } #endif @@ -901,14 +934,14 @@ #if !HAVE_READDIR_R # define readdir_r aio_readdir_r -static mutex_t readdirlock = MUTEX_INIT; +static mutex_t readdirlock = X_MUTEX_INIT; -static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res) +static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res) { - struct dirent *e; + X_DIRENT *e; int errorno; - LOCK (readdirlock); + X_LOCK (readdirlock); e = readdir (dirp); errorno = errno; @@ -921,7 +954,7 @@ else *res = 0; - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); errno = errorno; return e ? 0 : -1; @@ -1027,22 +1060,21 @@ DIR *dirp; union { - struct dirent d; - char b [offsetof (struct dirent, d_name) + NAME_MAX + 1]; + X_DIRENT d; + char b [offsetof (X_DIRENT, d_name) + NAME_MAX + 1]; } *u; - struct dirent *entp; + X_DIRENT *entp; char *name, *names; int memlen = 4096; int memofs = 0; int res = 0; - int errorno; - LOCK (wrklock); + X_LOCK (wrklock); self->dirp = dirp = opendir (req->ptr1); self->dbuf = u = malloc (sizeof (*u)); req->flags |= FLAG_PTR2_FREE; req->ptr2 = names = malloc (memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (dirp && u && names) for (;;) @@ -1064,9 +1096,9 @@ while (memofs + len > memlen) { memlen *= 2; - LOCK (wrklock); + X_LOCK (wrklock); req->ptr2 = names = realloc (names, memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (!names) break; @@ -1085,21 +1117,21 @@ /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (aio_proc) { + {//D aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; - /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + /* try to distribute timeouts somewhat randomly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -1110,21 +1142,21 @@ ++idle; - if (COND_TIMEDWAIT (reqwait, reqlock, ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - COND_WAIT (reqwait, reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -1133,7 +1165,7 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); errno = 0; /* strictly unnecessary */ @@ -1158,6 +1190,8 @@ case REQ_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break; case REQ_CHMOD: req->result = chmod (req->ptr1, req->mode); break; case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break; + case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break; + case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; case REQ_CLOSE: req->result = close (req->int1); break; @@ -1175,14 +1209,19 @@ case REQ_READDIR: scandir_ (req, self); break; case REQ_BUSY: +#ifdef _WIN32 + Sleep (req->nv1 * 1000.); +#else { struct timeval tv; tv.tv_sec = req->nv1; - tv.tv_usec = (req->nv1 - tv.tv_usec) * 1000000.; + tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.; req->result = select (0, 0, 0, 0, &tv); } +#endif + break; case REQ_UTIME: case REQ_FUTIME: @@ -1222,7 +1261,7 @@ req->errorno = errno; - LOCK (reslock); + X_LOCK (reslock); ++npending; @@ -1239,43 +1278,44 @@ self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; + }//D } /*****************************************************************************/ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); #if !HAVE_PREADWRITE - LOCK (preadwritelock); + X_LOCK (preadwritelock); #endif #if !HAVE_READDIR_R - LOCK (readdirlock); + X_LOCK (readdirlock); #endif } static void atfork_parent (void) { #if !HAVE_READDIR_R - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); #endif #if !HAVE_PREADWRITE - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); #endif - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -1307,7 +1347,9 @@ close (respipe [0]); close (respipe [1]); - create_pipe (); + + if (!create_pipe (respipe)) + croak ("cannot set result pipe to nonblocking mode"); atfork_parent (); } @@ -1346,11 +1388,22 @@ newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + X_MUTEX_CHECK (reqwait); + X_MUTEX_CHECK (preadwritelock); + X_MUTEX_CHECK (readdirlock); +#else newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); +#endif + + if (!create_pipe (respipe)) + croak ("cannot set result pipe to nonblocking mode"); - create_pipe (); - ATFORK (atfork_prepare, atfork_parent, atfork_child); + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); } void @@ -1593,6 +1646,29 @@ } void +aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef) + PPCODE: +{ + dREQ; + + req->sv1 = newSVsv (fh_or_path); + req->offs = SvOK (offset) ? SvVAL64 (offset) : -1; + + if (SvPOK (fh_or_path)) + { + req->type = REQ_TRUNCATE; + req->ptr1 = SvPVbyte_nolen (req->sv1); + } + else + { + req->type = REQ_FTRUNCATE; + req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path))); + } + + REQ_SEND; +} + +void aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef) PPCODE: { @@ -1811,10 +1887,10 @@ if (block_sig_level) croak ("cannot call IO::AIO::setsig from within aio_block/callback"); - LOCK (reslock); + X_LOCK (reslock); main_tid = pthread_self (); main_sig = signum; - UNLOCK (reslock); + X_UNLOCK (reslock); if (main_sig && npending) pthread_kill (main_tid, main_sig); @@ -1868,9 +1944,9 @@ nthreads() PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL