--- IO-AIO/AIO.xs 2007/06/01 13:25:51 1.101 +++ IO-AIO/AIO.xs 2007/07/08 11:05:36 1.104 @@ -6,21 +6,65 @@ #include "perl.h" #include "XSUB.h" -#include "autoconf/config.h" - #include #include #include -#include -#include #include #include #include -#include #include -#include #include -#include + +#ifdef _WIN32 + +# define SIGIO 0 + typedef Direntry_t X_DIRENT; +#undef malloc +#undef free + +// perl overrides all those nice win32 functions +# undef open +# undef read +# undef write +# undef send +# undef recv +# undef stat +# undef fstat +# define lstat stat +# undef truncate +# undef ftruncate +# undef open +# undef close +# undef unlink +# undef rmdir +# undef rename +# undef lseek + +# define chown(a,b,c) (errno = ENOSYS, -1) +# define fchown(a,b,c) (errno = ENOSYS, -1) +# define fchmod(a,b) (errno = ENOSYS, -1) +# define symlink(a,b) (errno = ENOSYS, -1) +# define readlink(a,b,c) (errno = ENOSYS, -1) +# define mknod(a,b,c) (errno = ENOSYS, -1) +# define truncate(a,b) (errno = ENOSYS, -1) +# define ftruncate(fd,o) chsize ((fd), (o)) +# define fsync(fd) _commit (fd) +# define opendir(fd) (errno = ENOSYS, 0) +# define readdir(fd) (errno = ENOSYS, -1) +# define closedir(fd) (errno = ENOSYS, -1) +# define mkdir(a,b) mkdir (a) + +#else + +# include "autoconf/config.h" +# include +# include +# include +# include +# include + typedef struct dirent X_DIRENT; + +#endif #if HAVE_SENDFILE # if __linux @@ -57,9 +101,9 @@ #define dBUF \ char *aio_buf; \ - LOCK (wrklock); \ + X_LOCK (wrklock); \ self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ - UNLOCK (wrklock); \ + X_UNLOCK (wrklock); \ if (!aio_buf) \ return -1; @@ -71,6 +115,7 @@ REQ_READ, REQ_WRITE, REQ_READAHEAD, REQ_SENDFILE, REQ_STAT, REQ_LSTAT, REQ_FSTAT, + REQ_TRUNCATE, REQ_FTRUNCATE, REQ_UTIME, REQ_FUTIME, REQ_CHMOD, REQ_FCHMOD, REQ_CHOWN, REQ_FCHOWN, @@ -179,7 +224,7 @@ static unsigned int started, idle, wanted; /* worker threads management */ -static mutex_t wrklock = MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ @@ -221,11 +266,11 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe [2], respipe_osf [2]; -static mutex_t reslock = MUTEX_INIT; -static mutex_t reqlock = MUTEX_INIT; -static cond_t reqwait = COND_INIT; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -233,9 +278,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -244,9 +289,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -255,9 +300,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -584,7 +629,32 @@ req_cancel_subs (req); } -static void *aio_proc (void *arg); +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif + +static void +create_pipe (int fd[2]) +{ +#ifdef _WIN32 + int arg = 1; + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, fd) + || ioctlsocket (TO_SOCKET (fd [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (fd [1]), FIONBIO, &arg)) +#else + if (pipe (fd) + || fcntl (fd [0], F_SETFL, O_NONBLOCK) + || fcntl (fd [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + +X_THREAD_PROC (aio_proc); static void start_thread (void) { @@ -593,7 +663,7 @@ if (!wrk) croak ("unable to allocate worker thread data"); - LOCK (wrklock); + X_LOCK (wrklock); if (thread_create (&wrk->tid, aio_proc, (void *)wrk)) { @@ -606,7 +676,7 @@ else free (wrk); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -627,11 +697,11 @@ ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); unblock_sig (); @@ -647,21 +717,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -686,19 +756,19 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -722,7 +792,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -733,12 +803,12 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (PerlSock_recv (respipe [0], buf, 4, 0) == 4) ; } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -788,18 +858,6 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ /* work around various missing functions */ @@ -812,19 +870,19 @@ * normal read/write by using a mutex. slows down execution a lot, * but that's your problem, not mine. */ -static mutex_t preadwritelock = MUTEX_INIT; +static mutex_t preadwritelock = X_MUTEX_INIT; static ssize_t pread (int fd, void *buf, size_t count, off_t offset) { ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = read (fd, buf, count); lseek (fd, ooffset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -834,12 +892,12 @@ ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = write (fd, buf, count); lseek (fd, offset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -882,18 +940,20 @@ static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self) { + size_t todo = count; dBUF; - while (count > 0) + while (todo > 0) { - size_t len = count < AIO_BUFSIZE ? count : AIO_BUFSIZE; + size_t len = todo < AIO_BUFSIZE ? todo : AIO_BUFSIZE; pread (fd, aio_buf, len, offset); offset += len; - count -= len; + todo -= len; } errno = 0; + return count; } #endif @@ -901,14 +961,14 @@ #if !HAVE_READDIR_R # define readdir_r aio_readdir_r -static mutex_t readdirlock = MUTEX_INIT; +static mutex_t readdirlock = X_MUTEX_INIT; -static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res) +static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res) { - struct dirent *e; + X_DIRENT *e; int errorno; - LOCK (readdirlock); + X_LOCK (readdirlock); e = readdir (dirp); errorno = errno; @@ -921,7 +981,7 @@ else *res = 0; - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); errno = errorno; return e ? 0 : -1; @@ -1027,22 +1087,21 @@ DIR *dirp; union { - struct dirent d; - char b [offsetof (struct dirent, d_name) + NAME_MAX + 1]; + X_DIRENT d; + char b [offsetof (X_DIRENT, d_name) + NAME_MAX + 1]; } *u; - struct dirent *entp; + X_DIRENT *entp; char *name, *names; int memlen = 4096; int memofs = 0; int res = 0; - int errorno; - LOCK (wrklock); + X_LOCK (wrklock); self->dirp = dirp = opendir (req->ptr1); self->dbuf = u = malloc (sizeof (*u)); req->flags |= FLAG_PTR2_FREE; req->ptr2 = names = malloc (memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (dirp && u && names) for (;;) @@ -1064,9 +1123,9 @@ while (memofs + len > memlen) { memlen *= 2; - LOCK (wrklock); + X_LOCK (wrklock); req->ptr2 = names = realloc (names, memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (!names) break; @@ -1085,21 +1144,21 @@ /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (aio_proc) { + {//D aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; - /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + /* try to distribute timeouts somewhat randomly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -1110,21 +1169,21 @@ ++idle; - if (COND_TIMEDWAIT (reqwait, reqlock, ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - COND_WAIT (reqwait, reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -1133,7 +1192,7 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); errno = 0; /* strictly unnecessary */ @@ -1158,6 +1217,8 @@ case REQ_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break; case REQ_CHMOD: req->result = chmod (req->ptr1, req->mode); break; case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break; + case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break; + case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; case REQ_CLOSE: req->result = close (req->int1); break; @@ -1175,14 +1236,19 @@ case REQ_READDIR: scandir_ (req, self); break; case REQ_BUSY: +#ifdef _WIN32 + Sleep (req->nv1 * 1000.); +#else { struct timeval tv; tv.tv_sec = req->nv1; - tv.tv_usec = (req->nv1 - tv.tv_usec) * 1000000.; + tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.; req->result = select (0, 0, 0, 0, &tv); } +#endif + break; case REQ_UTIME: case REQ_FUTIME: @@ -1222,14 +1288,14 @@ req->errorno = errno; - LOCK (reslock); + X_LOCK (reslock); ++npending; if (!reqq_push (&res_queue, req)) { /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + send (respipe_osf [1], (const void *)&respipe_osf, 1, 0); /* optionally signal the main thread asynchronously */ if (main_sig) @@ -1239,43 +1305,44 @@ self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; + }//D } /*****************************************************************************/ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); #if !HAVE_PREADWRITE - LOCK (preadwritelock); + X_LOCK (preadwritelock); #endif #if !HAVE_READDIR_R - LOCK (readdirlock); + X_LOCK (readdirlock); #endif } static void atfork_parent (void) { #if !HAVE_READDIR_R - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); #endif #if !HAVE_PREADWRITE - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); #endif - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -1305,9 +1372,10 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - create_pipe (); + PerlSock_closesocket (respipe [0]); + PerlSock_closesocket (respipe [1]); + + create_pipe (respipe); atfork_parent (); } @@ -1346,11 +1414,23 @@ newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + X_MUTEX_CHECK (reqwait); + X_MUTEX_CHECK (preadwritelock); + X_MUTEX_CHECK (readdirlock); + + X_COND_CHECK (reqwait); +#else newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); +#endif + + create_pipe (respipe); - create_pipe (); - ATFORK (atfork_prepare, atfork_parent, atfork_child); + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); } void @@ -1423,7 +1503,7 @@ } void -aio_read (SV *fh, SV *offset, UV length, SV8 *data, UV dataoffset, SV *callback=&PL_sv_undef) +aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef) ALIAS: aio_read = REQ_READ aio_write = REQ_WRITE @@ -1432,6 +1512,7 @@ { STRLEN svlen; char *svptr = SvPVbyte (data, svlen); + UV len = SvUV (length); SvUPGRADE (data, SVt_PV); SvPOK_on (data); @@ -1440,21 +1521,21 @@ dataoffset += svlen; if (dataoffset < 0 || dataoffset > svlen) - croak ("data offset outside of string"); + croak ("dataoffset outside of data scalar"); if (ix == REQ_WRITE) { /* write: check length and adjust. */ - if (length < 0 || length + dataoffset > svlen) - length = svlen - dataoffset; + if (!SvOK (length) || len + dataoffset > svlen) + len = svlen - dataoffset; } else { /* read: grow scalar as necessary */ - svptr = SvGROW (data, length + dataoffset + 1); + svptr = SvGROW (data, len + dataoffset + 1); } - if (length < 0) + if (len < 0) croak ("length must not be negative"); { @@ -1465,7 +1546,7 @@ req->int1 = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh)) : IoOFP (sv_2io (fh))); req->offs = SvOK (offset) ? SvVAL64 (offset) : -1; - req->size = length; + req->size = len; req->sv2 = SvREFCNT_inc (data); req->ptr1 = (char *)svptr + dataoffset; req->stroffset = dataoffset; @@ -1592,6 +1673,29 @@ } void +aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef) + PPCODE: +{ + dREQ; + + req->sv1 = newSVsv (fh_or_path); + req->offs = SvOK (offset) ? SvVAL64 (offset) : -1; + + if (SvPOK (fh_or_path)) + { + req->type = REQ_TRUNCATE; + req->ptr1 = SvPVbyte_nolen (req->sv1); + } + else + { + req->type = REQ_FTRUNCATE; + req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path))); + } + + REQ_SEND; +} + +void aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef) PPCODE: { @@ -1810,10 +1914,10 @@ if (block_sig_level) croak ("cannot call IO::AIO::setsig from within aio_block/callback"); - LOCK (reslock); + X_LOCK (reslock); main_tid = pthread_self (); main_sig = signum; - UNLOCK (reslock); + X_UNLOCK (reslock); if (main_sig && npending) pthread_kill (main_tid, main_sig); @@ -1867,9 +1971,9 @@ nthreads() PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL