--- Linux-AIO/AIO.xs 2004/05/05 10:13:30 1.16 +++ Linux-AIO/AIO.xs 2004/07/18 10:55:34 1.20 @@ -1,3 +1,5 @@ +#define PERL_NO_GET_CONTEXT + #include "EXTERN.h" #include "perl.h" #include "XSUB.h" @@ -22,13 +24,19 @@ #define STACKSIZE 1024 /* yeah */ -enum { REQ_QUIT, REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, REQ_STAT, REQ_LSTAT, REQ_FSTAT}; +enum { + REQ_QUIT, + REQ_OPEN, REQ_CLOSE, REQ_READ, REQ_WRITE, + REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_UNLINK +}; typedef struct { char stack[STACKSIZE]; } aio_thread; -typedef struct { +typedef struct aio_cb { + struct aio_cb *next; + int type; aio_thread *thread; @@ -51,10 +59,12 @@ static int nreqs; static int reqpipe[2], respipe[2]; +static aio_req qs, qe; /* queue start, queue end */ + static int aio_proc(void *arg); static void -start_thread(void) +start_thread (void) { aio_thread *thr; @@ -62,7 +72,7 @@ if (clone (aio_proc, &(thr->stack[STACKSIZE]), - CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND, + CLONE_VM|CLONE_FS|CLONE_FILES, thr) >= 0) started++; else @@ -70,24 +80,44 @@ } static void -end_thread(void) +send_reqs (void) { - aio_req req; - New (0, req, 1, aio_cb); - req->type = REQ_QUIT; - write (reqpipe[1], &req, sizeof (aio_req)); + /* this write is atomic */ + while (qs && write (reqpipe[1], &qs, sizeof qs) == sizeof qs) + { + qs = qs->next; + if (!qs) qe = 0; + } } static void send_req (aio_req req) { nreqs++; - write (reqpipe[1], &req, sizeof (aio_req)); + req->next = 0; + + if (qe) + qe->next = req; + else + qe = qs = req; + + send_reqs (); +} + +static void +end_thread (void) +{ + aio_req req; + New (0, req, 1, aio_cb); + req->type = REQ_QUIT; + + send_req (req); } static void -read_write (pTHX_ int dowrite, int fd, off_t offset, size_t length, - SV *data, STRLEN dataoffset, SV*callback) +read_write (pTHX_ + int dowrite, int fd, off_t offset, size_t length, + SV *data, STRLEN dataoffset, SV *callback) { aio_req req; STRLEN svlen; @@ -197,6 +227,9 @@ Safefree (req); } + if (qs) + send_reqs (); + return count; } @@ -204,9 +237,10 @@ #undef errno #include +#include static int -aio_proc(void *thr_arg) +aio_proc (void *thr_arg) { aio_thread *thr = thr_arg; aio_req req; @@ -227,7 +261,10 @@ _syscall2(int,lstat64, const char *, filename, struct stat64 *, buf) _syscall2(int,fstat64, int, fd, struct stat64 *, buf) + _syscall1(int,unlink, char *, filename); + sigprocmask (SIG_SETMASK, &fullsigset, 0); + prctl (PR_SET_PDEATHSIG, SIGKILL); /* then loop */ while (read (reqpipe[0], (void *)&req, sizeof (req)) == sizeof (req)) @@ -235,24 +272,21 @@ req->thread = thr; errno = 0; /* strictly unnecessary */ - if (req->type == REQ_READ) - req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); - else if (req->type == REQ_WRITE) - req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); - else if (req->type == REQ_OPEN) - req->result = open (req->dataptr, req->fd, req->mode); - else if (req->type == REQ_CLOSE) - req->result = close (req->fd); - else if (req->type == REQ_STAT) - req->result = stat64 (req->dataptr, req->statdata); - else if (req->type == REQ_LSTAT) - req->result = lstat64 (req->dataptr, req->statdata); - else if (req->type == REQ_FSTAT) - req->result = fstat64 (req->fd, req->statdata); - else + switch (req->type) { - write (respipe[1], (void *)&req, sizeof (req)); - break; + case REQ_READ: req->result = pread64 (req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break; + case REQ_WRITE: req->result = pwrite64(req->fd, req->dataptr, req->length, req->offset & 0xffffffff, req->offset >> 32); break; + case REQ_OPEN: req->result = open (req->dataptr, req->fd, req->mode); break; + case REQ_CLOSE: req->result = close (req->fd); break; + case REQ_STAT: req->result = stat64 (req->dataptr, req->statdata); break; + case REQ_LSTAT: req->result = lstat64 (req->dataptr, req->statdata); break; + case REQ_FSTAT: req->result = fstat64 (req->fd, req->statdata); break; + case REQ_UNLINK: req->result = unlink (req->dataptr); break; + + case REQ_QUIT: + default: + write (respipe[1], (void *)&req, sizeof (req)); + return 0; } req->errorno = errno; @@ -275,6 +309,9 @@ if (pipe (reqpipe) || pipe (respipe)) croak ("unable to initialize request or result pipe"); + if (fcntl (reqpipe[1], F_SETFL, O_NONBLOCK)) + croak ("cannot set result pipe to nonblocking mode"); + if (fcntl (respipe[0], F_SETFL, O_NONBLOCK)) croak ("cannot set result pipe to nonblocking mode"); } @@ -306,7 +343,7 @@ FD_SET(respipe[0], &rfd); select (respipe[0] + 1, &rfd, 0, 0, 0); - poll_cb (); + poll_cb (aTHX); } void @@ -412,6 +449,26 @@ send_req (req); +void +aio_unlink(pathname,callback) + SV * pathname + SV * callback + PROTOTYPE: $$ + CODE: + aio_req req; + + Newz (0, req, 1, aio_cb); + + if (!req) + croak ("out of memory during aio_req allocation"); + + req->type = REQ_UNLINK; + req->data = newSVsv (pathname); + req->dataptr = SvPV_nolen (req->data); + req->callback = SvREFCNT_inc (callback); + + send_req (req); + int poll_fileno() PROTOTYPE: