--- IO-AIO/AIO.xs 2005/08/23 01:18:04 1.36 +++ IO-AIO/AIO.xs 2005/08/23 12:37:19 1.37 @@ -9,9 +9,10 @@ #include +#include #include #include - +#include #include #include #include @@ -35,7 +36,7 @@ #if __ia64 # define STACKSIZE 65536 #else -# define STACKSIZE 4096 +# define STACKSIZE 8192 #endif enum { @@ -46,6 +47,7 @@ REQ_STAT, REQ_LSTAT, REQ_FSTAT, REQ_FSYNC, REQ_FDATASYNC, REQ_UNLINK, REQ_RMDIR, + REQ_READDIR, REQ_SYMLINK, }; @@ -54,6 +56,7 @@ int type; + /* should receive a cleanup, with unions */ int fd, fd2; off_t offset; size_t length; @@ -99,6 +102,9 @@ if (req->callback) SvREFCNT_dec (req->callback); + if (req->type == REQ_READDIR && req->result >= 0) + free (req->data2ptr); + Safefree (req); } @@ -172,21 +178,48 @@ ENTER; PUSHMARK (SP); - XPUSHs (sv_2mortal (newSViv (req->result))); - if (req->type == REQ_OPEN) + if (req->type == REQ_READDIR) { - /* convert fd to fh */ - SV *fh; + SV *rv = &PL_sv_undef; - PUTBACK; - call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); - SPAGAIN; + if (req->result >= 0) + { + char *buf = req->data2ptr; + AV *av = newAV (); + + while (req->result) + { + SV *sv = newSVpv (buf, 0); + + av_push (av, sv); + buf += SvCUR (sv) + 1; + req->result--; + } - fh = SvREFCNT_inc (POPs); + rv = sv_2mortal (newRV_noinc ((SV *)av)); + } - PUSHMARK (SP); - XPUSHs (sv_2mortal (fh)); + XPUSHs (rv); + } + else + { + XPUSHs (sv_2mortal (newSViv (req->result))); + + if (req->type == REQ_OPEN) + { + /* convert fd to fh */ + SV *fh; + + PUTBACK; + call_pv ("IO::AIO::_fd2fh", G_SCALAR | G_EVAL); + SPAGAIN; + + fh = SvREFCNT_inc (POPs); + + PUSHMARK (SP); + XPUSHs (sv_2mortal (fh)); + } } if (SvOK (req->callback)) @@ -319,49 +352,6 @@ croak ("cannot set result pipe to nonblocking mode"); } -static void atfork_prepare (void) -{ - pthread_mutex_lock (&reqlock); - pthread_mutex_lock (&reslock); -} - -static void atfork_parent (void) -{ - pthread_mutex_unlock (&reslock); - pthread_mutex_unlock (&reqlock); -} - -static void atfork_child (void) -{ - aio_req prv; - - started = 0; - - while (reqs) - { - prv = reqs; - reqs = prv->next; - free_req (prv); - } - - reqs = reqe = 0; - - while (ress) - { - prv = ress; - ress = prv->next; - free_req (prv); - } - - ress = rese = 0; - - close (respipe [0]); - close (respipe [1]); - create_pipe (); - - atfork_parent (); -} - /*****************************************************************************/ /* work around various missing functions */ @@ -374,7 +364,7 @@ * normal read/write by using a mutex. slows down execution a lot, * but that's your problem, not mine. */ -static pthread_mutex_t iolock = PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t preadwritelock = PTHREAD_MUTEX_INITIALIZER; static ssize_t pread (int fd, void *buf, size_t count, off_t offset) @@ -382,12 +372,12 @@ ssize_t res; off_t ooffset; - pthread_mutex_lock (&iolock); + pthread_mutex_lock (&preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = read (fd, buf, count); lseek (fd, ooffset, SEEK_SET); - pthread_mutex_unlock (&iolock); + pthread_mutex_unlock (&preadwritelock); return res; } @@ -398,12 +388,12 @@ ssize_t res; off_t ooffset; - pthread_mutex_lock (&iolock); + pthread_mutex_lock (&preadwritelock); ooffset = lseek (fd, 0, SEEK_CUR); lseek (fd, offset, SEEK_SET); res = write (fd, buf, count); lseek (fd, offset, SEEK_SET); - pthread_mutex_unlock (&iolock); + pthread_mutex_unlock (&preadwritelock); return res; } @@ -416,11 +406,11 @@ #if !HAVE_READAHEAD # define readahead aio_readahead -static char readahead_buf[4096]; - static ssize_t readahead (int fd, off_t offset, size_t count) { + char readahead_buf[4096]; + while (count > 0) { size_t len = count < sizeof (readahead_buf) ? count : sizeof (readahead_buf); @@ -434,102 +424,191 @@ } #endif +#if !HAVE_READDIR_R +# define readdir_r aio_readdir_r + +static pthread_mutex_t readdirlock = PTHREAD_MUTEX_INITIALIZER; + +static int +readdir_r (DIR *dirp, struct dirent *ent, struct dirent **res) +{ + struct dirent *e; + int errorno; + + pthread_mutex_lock (&readdirlock); + + e = readdir (dirp); + errorno = errno; + + if (e) + { + *res = ent; + strcpy (ent->d_name, e->d_name); + } + else + *res = 0; + + pthread_mutex_unlock (&readdirlock); + + errno = errorno; + return e ? 0 : -1; +} +#endif + /* sendfile always needs emulation */ static ssize_t sendfile_ (int ofd, int ifd, off_t offset, size_t count) { - ssize_t res; + ssize_t res; - if (!count) - return 0; + if (!count) + return 0; #if HAVE_SENDFILE # if __linux - res = sendfile (ofd, ifd, &offset, count); + res = sendfile (ofd, ifd, &offset, count); # elif __freebsd - /* - * Of course, the freebsd sendfile is a dire hack with no thoughts - * wasted on making it similar to other I/O functions. - */ - { - off_t sbytes; - res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); - - if (res < 0 && sbytes) - /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */ - res = sbytes; - } + /* + * Of course, the freebsd sendfile is a dire hack with no thoughts + * wasted on making it similar to other I/O functions. + */ + { + off_t sbytes; + res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); + + if (res < 0 && sbytes) + /* maybe only on EAGAIN only: as usual, the manpage leaves you guessing */ + res = sbytes; + } # elif __hpux - res = sendfile (ofd, ifd, offset, count, 0, 0); + res = sendfile (ofd, ifd, offset, count, 0, 0); # elif __solaris - { - struct sendfilevec vec; - size_t sbytes; - - vec.sfv_fd = ifd; - vec.sfv_flag = 0; - vec.sfv_off = offset; - vec.sfv_len = count; - - res = sendfilev (ofd, &vec, 1, &sbytes); - - if (res < 0 && sbytes) - res = sbytes; - } + { + struct sendfilevec vec; + size_t sbytes; + + vec.sfv_fd = ifd; + vec.sfv_flag = 0; + vec.sfv_off = offset; + vec.sfv_len = count; + + res = sendfilev (ofd, &vec, 1, &sbytes); + + if (res < 0 && sbytes) + res = sbytes; + } # else - res = -1; - errno = ENOSYS; + res = -1; + errno = ENOSYS; # endif #endif - if (res < 0 - && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK + if (res < 0 + && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK #if __solaris - || errno == EAFNOSUPPORT || errno == EPROTOTYPE + || errno == EAFNOSUPPORT || errno == EPROTOTYPE #endif - ) - ) - { - /* emulate sendfile. this is a major pain in the ass */ - char *buf = malloc (4096); - res = 0; - - for (;;) - { - ssize_t cnt; - - cnt = pread (ifd, buf, 4096, offset); - - if (cnt <= 0) - { - if (cnt && !res) res = -1; - break; - } - - cnt = write (ofd, buf, cnt); - - if (cnt <= 0) - { - if (cnt && !res) res = -1; - break; - } - - offset += cnt; - res += cnt; - } - - { - int errorno = errno; - free (buf); - errno = errorno; - } - } + ) + ) + { + /* emulate sendfile. this is a major pain in the ass */ + char buf[4096]; + res = 0; + + for (;;) + { + ssize_t cnt; + + cnt = pread (ifd, buf, 4096, offset); - return res; + if (cnt <= 0) + { + if (cnt && !res) res = -1; + break; + } + + cnt = write (ofd, buf, cnt); + + if (cnt <= 0) + { + if (cnt && !res) res = -1; + break; + } + + offset += cnt; + res += cnt; + } + } + + return res; +} + +/* read a full directory */ +static int +scandir_ (const char *path, void **namesp) +{ + DIR *dirp = opendir (path); + union + { + struct dirent d; + char b [offsetof (struct dirent, d_name) + NAME_MAX + 1]; + } u; + struct dirent *entp; + char *name, *names; + int memlen = 4096; + int memofs = 0; + int res = 0; + int errorno; + + if (!dirp) + return -1; + + names = malloc (memlen); + + for (;;) + { + errno = 0, readdir_r (dirp, &u.d, &entp); + + if (!entp) + break; + + name = entp->d_name; + + if (name [0] != '.' || (name [1] && (name [1] != '.' || name [2]))) + { + int len = strlen (name) + 1; + + res++; + + while (memofs + len > memlen) + { + memlen *= 2; + names = realloc (names, memlen); + if (!names) + break; + } + + memcpy (names + memofs, name, len); + memofs += len; + } + } + + errorno = errno; + closedir (dirp); + + if (errorno) + { + free (names); + errno = errorno; + res = -1; + } + + *namesp = (void *)names; + return res; } /*****************************************************************************/ @@ -586,6 +665,7 @@ case REQ_FDATASYNC: req->result = fdatasync (req->fd); break; case REQ_FSYNC: req->result = fsync (req->fd); break; + case REQ_READDIR: req->result = scandir_ (req->dataptr, &req->data2ptr); break; case REQ_QUIT: break; @@ -621,6 +701,63 @@ return 0; } +/*****************************************************************************/ + +static void atfork_prepare (void) +{ + pthread_mutex_lock (&reqlock); + pthread_mutex_lock (&reslock); +#if !HAVE_PREADWRITE + pthread_mutex_lock (&preadwritelock); +#endif +#if !HAVE_READDIR_R + pthread_mutex_lock (&readdirlock); +#endif +} + +static void atfork_parent (void) +{ +#if !HAVE_READDIR_R + pthread_mutex_unlock (&readdirlock); +#endif +#if !HAVE_PREADWRITE + pthread_mutex_unlock (&preadwritelock); +#endif + pthread_mutex_unlock (&reslock); + pthread_mutex_unlock (&reqlock); +} + +static void atfork_child (void) +{ + aio_req prv; + + started = 0; + + while (reqs) + { + prv = reqs; + reqs = prv->next; + free_req (prv); + } + + reqs = reqe = 0; + + while (ress) + { + prv = ress; + ress = prv->next; + free_req (prv); + } + + ress = rese = 0; + + close (respipe [0]); + close (respipe [1]); + create_pipe (); + + atfork_parent (); +} + #define dREQ \ aio_req req; \ \ @@ -876,6 +1013,21 @@ req->dataptr = SvPVbyte_nolen (req->data); send_req (req); +} + +void +aio_readdir(pathname,callback=&PL_sv_undef) + SV * pathname + SV * callback + CODE: +{ + dREQ; + + req->type = REQ_READDIR; + req->data = newSVsv (pathname); + req->dataptr = SvPVbyte_nolen (req->data); + + send_req (req); } void