--- IO-AIO/AIO.xs 2007/06/01 06:00:40 1.100 +++ IO-AIO/AIO.xs 2007/10/06 14:05:19 1.109 @@ -6,21 +6,65 @@ #include "perl.h" #include "XSUB.h" -#include "autoconf/config.h" - #include #include #include -#include -#include #include #include #include -#include #include -#include #include -#include + +#ifdef _WIN32 + +# define SIGIO 0 + typedef Direntry_t X_DIRENT; +#undef malloc +#undef free + +// perl overrides all those nice win32 functions +# undef open +# undef read +# undef write +# undef send +# undef recv +# undef stat +# undef fstat +# define lstat stat +# undef truncate +# undef ftruncate +# undef open +# undef close +# undef unlink +# undef rmdir +# undef rename +# undef lseek + +# define chown(a,b,c) (errno = ENOSYS, -1) +# define fchown(a,b,c) (errno = ENOSYS, -1) +# define fchmod(a,b) (errno = ENOSYS, -1) +# define symlink(a,b) (errno = ENOSYS, -1) +# define readlink(a,b,c) (errno = ENOSYS, -1) +# define mknod(a,b,c) (errno = ENOSYS, -1) +# define truncate(a,b) (errno = ENOSYS, -1) +# define ftruncate(fd,o) chsize ((fd), (o)) +# define fsync(fd) _commit (fd) +# define opendir(fd) (errno = ENOSYS, 0) +# define readdir(fd) (errno = ENOSYS, -1) +# define closedir(fd) (errno = ENOSYS, -1) +# define mkdir(a,b) mkdir (a) + +#else + +# include "autoconf/config.h" +# include +# include +# include +# include +# include + typedef struct dirent X_DIRENT; + +#endif #if HAVE_SENDFILE # if __linux @@ -48,11 +92,20 @@ /* buffer size for various temporary buffers */ #define AIO_BUFSIZE 65536 +/* use NV for 32 bit perls as it allows larger offsets */ +#if IVSIZE >= 8 +# define SvVAL64 SvIV +#else +# define SvVAL64 SvNV +#endif + +static HV *stash; + #define dBUF \ char *aio_buf; \ - LOCK (wrklock); \ + X_LOCK (wrklock); \ self->dbuf = aio_buf = malloc (AIO_BUFSIZE); \ - UNLOCK (wrklock); \ + X_UNLOCK (wrklock); \ if (!aio_buf) \ return -1; @@ -61,12 +114,13 @@ enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, - REQ_READ, REQ_WRITE, REQ_READAHEAD, - REQ_SENDFILE, + REQ_READ, REQ_WRITE, + REQ_READAHEAD, REQ_SENDFILE, REQ_STAT, REQ_LSTAT, REQ_FSTAT, - REQ_UTIME, REQ_FUTIME, /* must be consecutive */ - REQ_CHMOD, REQ_FCHMOD, /* must be consecutive */ - REQ_CHOWN, REQ_FCHOWN, /* must be consecutive */ + REQ_TRUNCATE, REQ_FTRUNCATE, + REQ_UTIME, REQ_FUTIME, + REQ_CHMOD, REQ_FCHMOD, + REQ_CHOWN, REQ_FCHOWN, REQ_FSYNC, REQ_FDATASYNC, REQ_UNLINK, REQ_RMDIR, REQ_MKDIR, REQ_RENAME, REQ_MKNOD, REQ_READDIR, @@ -172,7 +226,7 @@ static unsigned int started, idle, wanted; /* worker threads management */ -static mutex_t wrklock = MUTEX_INIT; +static mutex_t wrklock = X_MUTEX_INIT; typedef struct worker { /* locked by wrklock */ @@ -214,11 +268,11 @@ static volatile unsigned int nreqs, nready, npending; static volatile unsigned int max_idle = 4; static volatile unsigned int max_outstanding = 0xffffffff; -static int respipe [2]; +static int respipe_osf [2], respipe [2] = { -1, -1 }; -static mutex_t reslock = MUTEX_INIT; -static mutex_t reqlock = MUTEX_INIT; -static cond_t reqwait = COND_INIT; +static mutex_t reslock = X_MUTEX_INIT; +static mutex_t reqlock = X_MUTEX_INIT; +static cond_t reqwait = X_COND_INIT; #if WORDACCESS_UNSAFE @@ -226,9 +280,9 @@ { unsigned int retval; - LOCK (reqlock); + X_LOCK (reqlock); retval = nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); return retval; } @@ -237,9 +291,9 @@ { unsigned int retval; - LOCK (reslock); + X_LOCK (reslock); retval = npending; - UNLOCK (reslock); + X_UNLOCK (reslock); return retval; } @@ -248,9 +302,9 @@ { unsigned int retval; - LOCK (wrklock); + X_LOCK (wrklock); retval = started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); return retval; } @@ -319,7 +373,7 @@ static int poll_cb (); static int req_invoke (aio_req req); -static void req_free (aio_req req); +static void req_destroy (aio_req req); static void req_cancel (aio_req req); /* must be called at most once */ @@ -395,12 +449,12 @@ if (!req_invoke (grp)) { - req_free (grp); + req_destroy (grp); unblock_sig (); croak (0); } - req_free (grp); + req_destroy (grp); unblock_sig (); } } @@ -451,16 +505,31 @@ case REQ_OPEN: { /* convert fd to fh */ - SV *fh; + SV *fh = &PL_sv_undef; - PUSHs (sv_2mortal (newSViv (req->result))); - PUTBACK; - call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); - SPAGAIN; - - fh = POPs; - PUSHMARK (SP); - XPUSHs (fh); + if (req->result >= 0) + { + GV *gv = (GV *)sv_newmortal (); + int flags = req->int1 & (O_RDONLY | O_WRONLY | O_RDWR); + char sym [64]; + int symlen; + + symlen = snprintf (sym, sizeof (sym), "fd#%d", req->result); + gv_init (gv, stash, sym, symlen, 0); + + symlen = snprintf ( + sym, + sizeof (sym), + "%s&=%d", + flags == O_RDONLY ? "<" : flags == O_WRONLY ? ">" : "+<", + req->result + ); + + if (do_open (gv, sym, symlen, 0, 0, 0, 0)) + fh = (SV *)gv; + } + + PUSHs (fh); } break; @@ -514,11 +583,13 @@ errno = req->errorno; PUTBACK; - call_sv (req->callback, G_VOID | G_EVAL); + call_sv (req->callback, G_VOID | G_EVAL | G_DISCARD); SPAGAIN; FREETMPS; LEAVE; + + PUTBACK; } if (req->grp) @@ -538,7 +609,7 @@ return !SvTRUE (ERRSV); } -static void req_free (aio_req req) +static void req_destroy (aio_req req) { if (req->self) { @@ -577,7 +648,51 @@ req_cancel_subs (req); } -static void *aio_proc(void *arg); +#ifdef USE_SOCKETS_AS_HANDLES +# define TO_SOCKET(x) (win32_get_osfhandle (x)) +#else +# define TO_SOCKET(x) (x) +#endif + +static void +create_respipe () +{ + int old_readfd = respipe [0]; + + if (respipe [1] >= 0) + respipe_close (TO_SOCKET (respipe [1])); + +#ifdef _WIN32 + if (PerlSock_socketpair (AF_UNIX, SOCK_STREAM, 0, respipe)) +#else + if (pipe (respipe)) +#endif + croak ("unable to initialize result pipe"); + + if (old_readfd >= 0) + { + if (dup2 (TO_SOCKET (respipe [0]), TO_SOCKET (old_readfd)) < 0) + croak ("unable to initialize result pipe(2)"); + + respipe_close (respipe [0]); + respipe [0] = old_readfd; + } + +#ifdef _WIN32 + int arg = 1; + if (ioctlsocket (TO_SOCKET (respipe [0]), FIONBIO, &arg) + || ioctlsocket (TO_SOCKET (respipe [1]), FIONBIO, &arg)) +#else + if (fcntl (respipe [0], F_SETFL, O_NONBLOCK) + || fcntl (respipe [1], F_SETFL, O_NONBLOCK)) +#endif + croak ("unable to initialize result pipe(3)"); + + respipe_osf [0] = TO_SOCKET (respipe [0]); + respipe_osf [1] = TO_SOCKET (respipe [1]); +} + +X_THREAD_PROC (aio_proc); static void start_thread (void) { @@ -586,7 +701,7 @@ if (!wrk) croak ("unable to allocate worker thread data"); - LOCK (wrklock); + X_LOCK (wrklock); if (thread_create (&wrk->tid, aio_proc, (void *)wrk)) { @@ -599,7 +714,7 @@ else free (wrk); - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void maybe_start_thread () @@ -620,11 +735,11 @@ ++nreqs; - LOCK (reqlock); + X_LOCK (reqlock); ++nready; reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); unblock_sig (); @@ -640,21 +755,21 @@ req->type = REQ_QUIT; req->pri = PRI_MAX + PRI_BIAS; - LOCK (reqlock); + X_LOCK (reqlock); reqq_push (&req_queue, req); - COND_SIGNAL (reqwait); - UNLOCK (reqlock); + X_COND_SIGNAL (reqwait); + X_UNLOCK (reqlock); - LOCK (wrklock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); } static void set_max_idle (int nthreads) { - if (WORDACCESS_UNSAFE) LOCK (reqlock); + if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads <= 0 ? 1 : nthreads; - if (WORDACCESS_UNSAFE) UNLOCK (reqlock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } static void min_parallel (int nthreads) @@ -679,19 +794,19 @@ while (nreqs) { int size; - if (WORDACCESS_UNSAFE) LOCK (reslock); + if (WORDACCESS_UNSAFE) X_LOCK (reslock); size = res_queue.size; - if (WORDACCESS_UNSAFE) UNLOCK (reslock); + if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); if (size) return; maybe_start_thread (); - FD_ZERO(&rfd); - FD_SET(respipe [0], &rfd); + FD_ZERO (&rfd); + FD_SET (respipe [0], &rfd); - select (respipe [0] + 1, &rfd, 0, 0, 0); + PerlSock_select (respipe [0] + 1, &rfd, 0, 0, 0); } } @@ -715,7 +830,7 @@ { maybe_start_thread (); - LOCK (reslock); + X_LOCK (reslock); req = reqq_shift (&res_queue); if (req) @@ -726,12 +841,12 @@ { /* read any signals sent by the worker threads */ char buf [4]; - while (read (respipe [0], buf, 4) == 4) + while (respipe_read (respipe [0], buf, 4) == 4) ; } } - UNLOCK (reslock); + X_UNLOCK (reslock); if (!req) break; @@ -747,7 +862,7 @@ { if (!req_invoke (req)) { - req_free (req); + req_destroy (req); unblock_sig (); croak (0); } @@ -755,7 +870,7 @@ count++; } - req_free (req); + req_destroy (req); if (maxreqs && !--maxreqs) break; @@ -781,18 +896,6 @@ return count; } -static void create_pipe () -{ - if (pipe (respipe)) - croak ("unable to initialize result pipe"); - - if (fcntl (respipe [0], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); - - if (fcntl (respipe [1], F_SETFL, O_NONBLOCK)) - croak ("cannot set result pipe to nonblocking mode"); -} - /*****************************************************************************/ /* work around various missing functions */ @@ -805,19 +908,19 @@ * normal read/write by using a mutex. slows down execution a lot, * but that's your problem, not mine. */ -static mutex_t preadwritelock = MUTEX_INIT; +static mutex_t preadwritelock = X_MUTEX_INIT; static ssize_t pread (int fd, void *buf, size_t count, off_t offset) { ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = read (fd, buf, count); lseek (fd, ooffset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -827,12 +930,12 @@ ssize_t res; off_t ooffset; - LOCK (preadwritelock); + X_LOCK (preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = write (fd, buf, count); lseek (fd, offset, SEEK_SET); - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); return res; } @@ -875,18 +978,20 @@ static ssize_t aio_readahead (int fd, off_t offset, size_t count, worker *self) { + size_t todo = count; dBUF; - while (count > 0) + while (todo > 0) { - size_t len = count < AIO_BUFSIZE ? count : AIO_BUFSIZE; + size_t len = todo < AIO_BUFSIZE ? todo : AIO_BUFSIZE; pread (fd, aio_buf, len, offset); offset += len; - count -= len; + todo -= len; } errno = 0; + return count; } #endif @@ -894,14 +999,14 @@ #if !HAVE_READDIR_R # define readdir_r aio_readdir_r -static mutex_t readdirlock = MUTEX_INIT; +static mutex_t readdirlock = X_MUTEX_INIT; -static int readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res) +static int readdir_r (DIR *dirp, X_DIRENT *ent, X_DIRENT **res) { - struct dirent *e; + X_DIRENT *e; int errorno; - LOCK (readdirlock); + X_LOCK (readdirlock); e = readdir (dirp); errorno = errno; @@ -914,7 +1019,7 @@ else *res = 0; - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); errno = errorno; return e ? 0 : -1; @@ -1020,22 +1125,21 @@ DIR *dirp; union { - struct dirent d; - char b [offsetof (struct dirent, d_name) + NAME_MAX + 1]; + X_DIRENT d; + char b [offsetof (X_DIRENT, d_name) + NAME_MAX + 1]; } *u; - struct dirent *entp; + X_DIRENT *entp; char *name, *names; int memlen = 4096; int memofs = 0; int res = 0; - int errorno; - LOCK (wrklock); + X_LOCK (wrklock); self->dirp = dirp = opendir (req->ptr1); self->dbuf = u = malloc (sizeof (*u)); req->flags |= FLAG_PTR2_FREE; req->ptr2 = names = malloc (memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (dirp && u && names) for (;;) @@ -1057,9 +1161,9 @@ while (memofs + len > memlen) { memlen *= 2; - LOCK (wrklock); + X_LOCK (wrklock); req->ptr2 = names = realloc (names, memlen); - UNLOCK (wrklock); + X_UNLOCK (wrklock); if (!names) break; @@ -1078,21 +1182,21 @@ /*****************************************************************************/ -static void *aio_proc (void *thr_arg) +X_THREAD_PROC (aio_proc) { + {//D aio_req req; struct timespec ts; worker *self = (worker *)thr_arg; - /* try to distribute timeouts somewhat evenly */ - ts.tv_nsec = (((unsigned long)self + (unsigned long)ts.tv_sec) & 1023UL) - * (1000000000UL / 1024UL); + /* try to distribute timeouts somewhat randomly */ + ts.tv_nsec = ((unsigned long)self & 1023UL) * (1000000000UL / 1024UL); for (;;) { ts.tv_sec = time (0) + IDLE_TIMEOUT; - LOCK (reqlock); + X_LOCK (reqlock); for (;;) { @@ -1103,21 +1207,21 @@ ++idle; - if (COND_TIMEDWAIT (reqwait, reqlock, ts) + if (X_COND_TIMEDWAIT (reqwait, reqlock, ts) == ETIMEDOUT) { if (idle > max_idle) { --idle; - UNLOCK (reqlock); - LOCK (wrklock); + X_UNLOCK (reqlock); + X_LOCK (wrklock); --started; - UNLOCK (wrklock); + X_UNLOCK (wrklock); goto quit; } /* we are allowed to idle, so do so without any timeout */ - COND_WAIT (reqwait, reqlock); + X_COND_WAIT (reqwait, reqlock); ts.tv_sec = time (0) + IDLE_TIMEOUT; } @@ -1126,15 +1230,19 @@ --nready; - UNLOCK (reqlock); + X_UNLOCK (reqlock); errno = 0; /* strictly unnecessary */ if (!(req->flags & FLAG_CANCELLED)) switch (req->type) { - case REQ_READ: req->result = pread (req->int1, req->ptr1, req->size, req->offs); break; - case REQ_WRITE: req->result = pwrite (req->int1, req->ptr1, req->size, req->offs); break; + case REQ_READ: req->result = req->offs >= 0 + ? pread (req->int1, req->ptr1, req->size, req->offs) + : read (req->int1, req->ptr1, req->size); break; + case REQ_WRITE: req->result = req->offs >= 0 + ? pwrite (req->int1, req->ptr1, req->size, req->offs) + : write (req->int1, req->ptr1, req->size); break; case REQ_READAHEAD: req->result = readahead (req->int1, req->offs, req->size); break; case REQ_SENDFILE: req->result = sendfile_ (req->int1, req->int2, req->offs, req->size, self); break; @@ -1147,6 +1255,8 @@ case REQ_FCHOWN: req->result = fchown (req->int1, req->int2, req->int3); break; case REQ_CHMOD: req->result = chmod (req->ptr1, req->mode); break; case REQ_FCHMOD: req->result = fchmod (req->int1, req->mode); break; + case REQ_TRUNCATE: req->result = truncate (req->ptr1, req->offs); break; + case REQ_FTRUNCATE: req->result = ftruncate (req->int1, req->offs); break; case REQ_OPEN: req->result = open (req->ptr1, req->int1, req->mode); break; case REQ_CLOSE: req->result = close (req->int1); break; @@ -1164,14 +1274,19 @@ case REQ_READDIR: scandir_ (req, self); break; case REQ_BUSY: +#ifdef _WIN32 + Sleep (req->nv1 * 1000.); +#else { struct timeval tv; tv.tv_sec = req->nv1; - tv.tv_usec = (req->nv1 - tv.tv_usec) * 1000000.; + tv.tv_usec = (req->nv1 - tv.tv_sec) * 1000000.; req->result = select (0, 0, 0, 0, &tv); } +#endif + break; case REQ_UTIME: case REQ_FUTIME: @@ -1211,14 +1326,14 @@ req->errorno = errno; - LOCK (reslock); + X_LOCK (reslock); ++npending; if (!reqq_push (&res_queue, req)) { /* write a dummy byte to the pipe so fh becomes ready */ - write (respipe [1], &respipe, 1); + respipe_write (respipe_osf [1], (const void *)&respipe_osf, 1); /* optionally signal the main thread asynchronously */ if (main_sig) @@ -1228,43 +1343,44 @@ self->req = 0; worker_clear (self); - UNLOCK (reslock); + X_UNLOCK (reslock); } quit: - LOCK (wrklock); + X_LOCK (wrklock); worker_free (self); - UNLOCK (wrklock); + X_UNLOCK (wrklock); return 0; + }//D } /*****************************************************************************/ static void atfork_prepare (void) { - LOCK (wrklock); - LOCK (reqlock); - LOCK (reslock); + X_LOCK (wrklock); + X_LOCK (reqlock); + X_LOCK (reslock); #if !HAVE_PREADWRITE - LOCK (preadwritelock); + X_LOCK (preadwritelock); #endif #if !HAVE_READDIR_R - LOCK (readdirlock); + X_LOCK (readdirlock); #endif } static void atfork_parent (void) { #if !HAVE_READDIR_R - UNLOCK (readdirlock); + X_UNLOCK (readdirlock); #endif #if !HAVE_PREADWRITE - UNLOCK (preadwritelock); + X_UNLOCK (preadwritelock); #endif - UNLOCK (reslock); - UNLOCK (reqlock); - UNLOCK (wrklock); + X_UNLOCK (reslock); + X_UNLOCK (reqlock); + X_UNLOCK (wrklock); } static void atfork_child (void) @@ -1272,17 +1388,17 @@ aio_req prv; while (prv = reqq_shift (&req_queue)) - req_free (prv); + req_destroy (prv); while (prv = reqq_shift (&res_queue)) - req_free (prv); + req_destroy (prv); while (wrk_first.next != &wrk_first) { worker *wrk = wrk_first.next; if (wrk->req) - req_free (wrk->req); + req_destroy (wrk->req); worker_clear (wrk); worker_free (wrk); @@ -1294,9 +1410,7 @@ nready = 0; npending = 0; - close (respipe [0]); - close (respipe [1]); - create_pipe (); + create_respipe (); atfork_parent (); } @@ -1328,18 +1442,30 @@ BOOT: { - HV *stash = gv_stashpv ("IO::AIO", 1); + stash = gv_stashpv ("IO::AIO", 1); newCONSTSUB (stash, "EXDEV", newSViv (EXDEV)); newCONSTSUB (stash, "O_RDONLY", newSViv (O_RDONLY)); newCONSTSUB (stash, "O_WRONLY", newSViv (O_WRONLY)); newCONSTSUB (stash, "O_CREAT", newSViv (O_CREAT)); newCONSTSUB (stash, "O_TRUNC", newSViv (O_TRUNC)); +#ifdef _WIN32 + X_MUTEX_CHECK (wrklock); + X_MUTEX_CHECK (reslock); + X_MUTEX_CHECK (reqlock); + X_MUTEX_CHECK (reqwait); + X_MUTEX_CHECK (preadwritelock); + X_MUTEX_CHECK (readdirlock); + + X_COND_CHECK (reqwait); +#else newCONSTSUB (stash, "S_IFIFO", newSViv (S_IFIFO)); newCONSTSUB (stash, "SIGIO", newSViv (SIGIO)); +#endif - create_pipe (); - ATFORK (atfork_prepare, atfork_parent, atfork_child); + create_respipe (); + + X_THREAD_ATFORK (atfork_prepare, atfork_parent, atfork_child); } void @@ -1394,10 +1520,9 @@ } void -aio_close (SV *fh, SV *callback=&PL_sv_undef) +aio_fsync (SV *fh, SV *callback=&PL_sv_undef) PROTOTYPE: $;$ ALIAS: - aio_close = REQ_CLOSE aio_fsync = REQ_FSYNC aio_fdatasync = REQ_FDATASYNC PPCODE: @@ -1411,8 +1536,29 @@ REQ_SEND (req); } +int +_dup (int fd) + PROTOTYPE: $ + CODE: + RETVAL = dup (fd); + OUTPUT: + RETVAL + void -aio_read (SV *fh, UV offset, UV length, SV8 *data, UV dataoffset, SV *callback=&PL_sv_undef) +_aio_close (int fd, SV *callback=&PL_sv_undef) + PROTOTYPE: $;$ + PPCODE: +{ + dREQ; + + req->type = REQ_CLOSE; + req->int1 = fd; + + REQ_SEND (req); +} + +void +aio_read (SV *fh, SV *offset, SV *length, SV8 *data, IV dataoffset, SV *callback=&PL_sv_undef) ALIAS: aio_read = REQ_READ aio_write = REQ_WRITE @@ -1421,6 +1567,7 @@ { STRLEN svlen; char *svptr = SvPVbyte (data, svlen); + UV len = SvUV (length); SvUPGRADE (data, SVt_PV); SvPOK_on (data); @@ -1429,21 +1576,21 @@ dataoffset += svlen; if (dataoffset < 0 || dataoffset > svlen) - croak ("data offset outside of string"); + croak ("dataoffset outside of data scalar"); if (ix == REQ_WRITE) { /* write: check length and adjust. */ - if (length < 0 || length + dataoffset > svlen) - length = svlen - dataoffset; + if (!SvOK (length) || len + dataoffset > svlen) + len = svlen - dataoffset; } else { /* read: grow scalar as necessary */ - svptr = SvGROW (data, length + dataoffset + 1); + svptr = SvGROW (data, len + dataoffset + 1); } - if (length < 0) + if (len < 0) croak ("length must not be negative"); { @@ -1453,8 +1600,8 @@ req->sv1 = newSVsv (fh); req->int1 = PerlIO_fileno (ix == REQ_READ ? IoIFP (sv_2io (fh)) : IoOFP (sv_2io (fh))); - req->offs = offset; - req->size = length; + req->offs = SvOK (offset) ? SvVAL64 (offset) : -1; + req->size = len; req->sv2 = SvREFCNT_inc (data); req->ptr1 = (char *)svptr + dataoffset; req->stroffset = dataoffset; @@ -1490,7 +1637,7 @@ } void -aio_sendfile (SV *out_fh, SV *in_fh, UV in_offset, UV length, SV *callback=&PL_sv_undef) +aio_sendfile (SV *out_fh, SV *in_fh, SV *in_offset, UV length, SV *callback=&PL_sv_undef) PROTOTYPE: $$$$;$ PPCODE: { @@ -1501,14 +1648,14 @@ req->int1 = PerlIO_fileno (IoIFP (sv_2io (out_fh))); req->sv2 = newSVsv (in_fh); req->int2 = PerlIO_fileno (IoIFP (sv_2io (in_fh))); - req->offs = in_offset; + req->offs = SvVAL64 (in_offset); req->size = length; REQ_SEND; } void -aio_readahead (SV *fh, UV offset, IV length, SV *callback=&PL_sv_undef) +aio_readahead (SV *fh, SV *offset, IV length, SV *callback=&PL_sv_undef) PROTOTYPE: $$$;$ PPCODE: { @@ -1517,7 +1664,7 @@ req->type = REQ_READAHEAD; req->sv1 = newSVsv (fh); req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh))); - req->offs = offset; + req->offs = SvVAL64 (offset); req->size = length; REQ_SEND; @@ -1535,7 +1682,7 @@ req->ptr2 = malloc (sizeof (Stat_t)); if (!req->ptr2) { - req_free (req); + req_destroy (req); croak ("out of memory during aio_stat statdata allocation"); } @@ -1581,6 +1728,29 @@ } void +aio_truncate (SV8 *fh_or_path, SV *offset, SV *callback=&PL_sv_undef) + PPCODE: +{ + dREQ; + + req->sv1 = newSVsv (fh_or_path); + req->offs = SvOK (offset) ? SvVAL64 (offset) : -1; + + if (SvPOK (fh_or_path)) + { + req->type = REQ_TRUNCATE; + req->ptr1 = SvPVbyte_nolen (req->sv1); + } + else + { + req->type = REQ_FTRUNCATE; + req->int1 = PerlIO_fileno (IoIFP (sv_2io (fh_or_path))); + } + + REQ_SEND; +} + +void aio_chmod (SV8 *fh_or_path, int mode, SV *callback=&PL_sv_undef) PPCODE: { @@ -1799,10 +1969,10 @@ if (block_sig_level) croak ("cannot call IO::AIO::setsig from within aio_block/callback"); - LOCK (reslock); + X_LOCK (reslock); main_tid = pthread_self (); main_sig = signum; - UNLOCK (reslock); + X_UNLOCK (reslock); if (main_sig && npending) pthread_kill (main_tid, main_sig); @@ -1856,9 +2026,9 @@ nthreads() PROTOTYPE: CODE: - if (WORDACCESS_UNSAFE) LOCK (wrklock); + if (WORDACCESS_UNSAFE) X_LOCK (wrklock); RETVAL = started; - if (WORDACCESS_UNSAFE) UNLOCK (wrklock); + if (WORDACCESS_UNSAFE) X_UNLOCK (wrklock); OUTPUT: RETVAL