--- libeio/eio.c 2011/05/26 03:51:55 1.63 +++ libeio/eio.c 2011/06/20 07:28:15 1.74 @@ -37,7 +37,12 @@ * either the BSD or the GPL. */ +#ifndef _WIN32 +# include "config.h" +#endif + #include "eio.h" +#include "ecb.h" #ifdef EIO_STACKSIZE # define XTHREAD_STACKSIZE EIO_STACKSIZE @@ -56,6 +61,12 @@ #include #include +/* intptr_t comes from unistd.h, says POSIX/UNIX/tradition */ +/* intptr_t only comes form stdint.h, says idiot openbsd coder */ +#if HAVE_STDINT_H +# include +#endif + #ifndef EIO_FINISH # define EIO_FINISH(req) ((req)->finish) && !EIO_CANCELLED (req) ? (req)->finish (req) : 0 #endif @@ -73,7 +84,6 @@ /*doh*/ #else -# include "config.h" # include # include # include @@ -158,19 +168,6 @@ #define EIO_TICKS ((1000000 + 1023) >> 10) -/*****************************************************************************/ - -#if __GNUC__ >= 3 -# define expect(expr,value) __builtin_expect ((expr),(value)) -#else -# define expect(expr,value) (expr) -#endif - -#define expect_false(expr) expect ((expr) != 0, 0) -#define expect_true(expr) expect ((expr) != 0, 1) - -/*****************************************************************************/ - #define ETP_PRI_MIN EIO_PRI_MIN #define ETP_PRI_MAX EIO_PRI_MAX @@ -205,7 +202,8 @@ #define ETP_NUM_PRI (ETP_PRI_MAX - ETP_PRI_MIN + 1) /* calculate time difference in ~1/EIO_TICKS of a second */ -static int tvdiff (struct timeval *tv1, struct timeval *tv2) +ecb_inline int +tvdiff (struct timeval *tv1, struct timeval *tv2) { return (tv2->tv_sec - tv1->tv_sec ) * EIO_TICKS + ((tv2->tv_usec - tv1->tv_usec) >> 10); @@ -259,12 +257,14 @@ /* worker threads management */ -static void etp_worker_clear (etp_worker *wrk) +static void ecb_cold +etp_worker_clear (etp_worker *wrk) { ETP_WORKER_CLEAR (wrk); } -static void etp_worker_free (etp_worker *wrk) +static void ecb_cold +etp_worker_free (etp_worker *wrk) { wrk->next->prev = wrk->prev; wrk->prev->next = wrk->next; @@ -272,7 +272,8 @@ free (wrk); } -static unsigned int etp_nreqs (void) +static unsigned int +etp_nreqs (void) { int retval; if (WORDACCESS_UNSAFE) X_LOCK (reqlock); @@ -281,7 +282,8 @@ return retval; } -static unsigned int etp_nready (void) +static unsigned int +etp_nready (void) { unsigned int retval; @@ -292,7 +294,8 @@ return retval; } -static unsigned int etp_npending (void) +static unsigned int +etp_npending (void) { unsigned int retval; @@ -303,7 +306,8 @@ return retval; } -static unsigned int etp_nthreads (void) +static unsigned int +etp_nthreads (void) { unsigned int retval; @@ -327,7 +331,8 @@ static etp_reqq req_queue; static etp_reqq res_queue; -static int reqq_push (etp_reqq *q, ETP_REQ *req) +static int ecb_noinline +reqq_push (etp_reqq *q, ETP_REQ *req) { int pri = req->pri; req->next = 0; @@ -343,7 +348,8 @@ return q->size++; } -static ETP_REQ *reqq_shift (etp_reqq *q) +static ETP_REQ * ecb_noinline +reqq_shift (etp_reqq *q) { int pri; @@ -368,7 +374,8 @@ abort (); } -static void etp_thread_init (void) +static void ecb_cold +etp_thread_init (void) { X_MUTEX_CREATE (wrklock); X_MUTEX_CREATE (reslock); @@ -376,7 +383,8 @@ X_COND_CREATE (reqwait); } -static void etp_atfork_prepare (void) +static void ecb_cold +etp_atfork_prepare (void) { X_LOCK (wrklock); X_LOCK (reqlock); @@ -386,7 +394,8 @@ #endif } -static void etp_atfork_parent (void) +static void ecb_cold +etp_atfork_parent (void) { #if !HAVE_PREADWRITE X_UNLOCK (preadwritelock); @@ -396,7 +405,8 @@ X_UNLOCK (wrklock); } -static void etp_atfork_child (void) +static void ecb_cold +etp_atfork_child (void) { ETP_REQ *prv; @@ -426,14 +436,14 @@ etp_thread_init (); } -static void +static void ecb_cold etp_once_init (void) { etp_thread_init (); X_THREAD_ATFORK (etp_atfork_prepare, etp_atfork_parent, etp_atfork_child); } -static int +static int ecb_cold etp_init (void (*want_poll)(void), void (*done_poll)(void)) { static pthread_once_t doinit = PTHREAD_ONCE_INIT; @@ -448,7 +458,8 @@ X_THREAD_PROC (etp_proc); -static void etp_start_thread (void) +static void ecb_cold +etp_start_thread (void) { etp_worker *wrk = calloc (1, sizeof (etp_worker)); @@ -471,19 +482,21 @@ X_UNLOCK (wrklock); } -static void etp_maybe_start_thread (void) +static void +etp_maybe_start_thread (void) { - if (expect_true (etp_nthreads () >= wanted)) + if (ecb_expect_true (etp_nthreads () >= wanted)) return; /* todo: maybe use idle here, but might be less exact */ - if (expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) + if (ecb_expect_true (0 <= (int)etp_nthreads () + (int)etp_npending () - (int)etp_nreqs ())) return; etp_start_thread (); } -static void etp_end_thread (void) +static void ecb_cold +etp_end_thread (void) { eio_req *req = calloc (1, sizeof (eio_req)); @@ -500,7 +513,8 @@ X_UNLOCK (wrklock); } -static int etp_poll (void) +static int +etp_poll (void) { unsigned int maxreqs; unsigned int maxtime; @@ -540,7 +554,7 @@ --nreqs; X_UNLOCK (reqlock); - if (expect_false (req->type == EIO_GROUP && req->size)) + if (ecb_expect_false (req->type == EIO_GROUP && req->size)) { req->int1 = 1; /* mark request as delayed */ continue; @@ -548,11 +562,11 @@ else { int res = ETP_FINISH (req); - if (expect_false (res)) + if (ecb_expect_false (res)) return res; } - if (expect_false (maxreqs && !--maxreqs)) + if (ecb_expect_false (maxreqs && !--maxreqs)) break; if (maxtime) @@ -568,7 +582,8 @@ return -1; } -static void etp_cancel (ETP_REQ *req) +static void +etp_cancel (ETP_REQ *req) { X_LOCK (wrklock); req->flags |= EIO_FLAG_CANCELLED; @@ -577,14 +592,15 @@ eio_grp_cancel (req); } -static void etp_submit (ETP_REQ *req) +static void +etp_submit (ETP_REQ *req) { req->pri -= ETP_PRI_MIN; - if (expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; - if (expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; + if (ecb_expect_false (req->pri < ETP_PRI_MIN - ETP_PRI_MIN)) req->pri = ETP_PRI_MIN - ETP_PRI_MIN; + if (ecb_expect_false (req->pri > ETP_PRI_MAX - ETP_PRI_MIN)) req->pri = ETP_PRI_MAX - ETP_PRI_MIN; - if (expect_false (req->type == EIO_GROUP)) + if (ecb_expect_false (req->type == EIO_GROUP)) { /* I hope this is worth it :/ */ X_LOCK (reqlock); @@ -613,41 +629,47 @@ } } -static void etp_set_max_poll_time (double nseconds) +static void ecb_cold +etp_set_max_poll_time (double nseconds) { if (WORDACCESS_UNSAFE) X_LOCK (reslock); max_poll_time = nseconds * EIO_TICKS; if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); } -static void etp_set_max_poll_reqs (unsigned int maxreqs) +static void ecb_cold +etp_set_max_poll_reqs (unsigned int maxreqs) { if (WORDACCESS_UNSAFE) X_LOCK (reslock); max_poll_reqs = maxreqs; if (WORDACCESS_UNSAFE) X_UNLOCK (reslock); } -static void etp_set_max_idle (unsigned int nthreads) +static void ecb_cold +etp_set_max_idle (unsigned int nthreads) { if (WORDACCESS_UNSAFE) X_LOCK (reqlock); max_idle = nthreads; if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } -static void etp_set_idle_timeout (unsigned int seconds) +static void ecb_cold +etp_set_idle_timeout (unsigned int seconds) { if (WORDACCESS_UNSAFE) X_LOCK (reqlock); idle_timeout = seconds; if (WORDACCESS_UNSAFE) X_UNLOCK (reqlock); } -static void etp_set_min_parallel (unsigned int nthreads) +static void ecb_cold +etp_set_min_parallel (unsigned int nthreads) { if (wanted < nthreads) wanted = nthreads; } -static void etp_set_max_parallel (unsigned int nthreads) +static void ecb_cold +etp_set_max_parallel (unsigned int nthreads) { if (wanted > nthreads) wanted = nthreads; @@ -658,7 +680,8 @@ /*****************************************************************************/ -static void grp_try_feed (eio_req *grp) +static void +grp_try_feed (eio_req *grp) { while (grp->size < grp->int2 && !EIO_CANCELLED (grp)) { @@ -675,7 +698,8 @@ } } -static int grp_dec (eio_req *grp) +static int +grp_dec (eio_req *grp) { --grp->size; @@ -689,7 +713,8 @@ return 0; } -void eio_destroy (eio_req *req) +void +eio_destroy (eio_req *req) { if ((req)->flags & EIO_FLAG_PTR1_FREE) free (req->ptr1); if ((req)->flags & EIO_FLAG_PTR2_FREE) free (req->ptr2); @@ -697,7 +722,8 @@ EIO_DESTROY (req); } -static int eio_finish (eio_req *req) +static int +eio_finish (eio_req *req) { int res = EIO_FINISH (req); @@ -724,68 +750,81 @@ return res; } -void eio_grp_cancel (eio_req *grp) +void +eio_grp_cancel (eio_req *grp) { for (grp = grp->grp_first; grp; grp = grp->grp_next) eio_cancel (grp); } -void eio_cancel (eio_req *req) +void +eio_cancel (eio_req *req) { etp_cancel (req); } -void eio_submit (eio_req *req) +void +eio_submit (eio_req *req) { etp_submit (req); } -unsigned int eio_nreqs (void) +unsigned int +eio_nreqs (void) { return etp_nreqs (); } -unsigned int eio_nready (void) +unsigned int +eio_nready (void) { return etp_nready (); } -unsigned int eio_npending (void) +unsigned int +eio_npending (void) { return etp_npending (); } -unsigned int eio_nthreads (void) +unsigned int ecb_cold +eio_nthreads (void) { return etp_nthreads (); } -void eio_set_max_poll_time (double nseconds) +void ecb_cold +eio_set_max_poll_time (double nseconds) { etp_set_max_poll_time (nseconds); } -void eio_set_max_poll_reqs (unsigned int maxreqs) +void ecb_cold +eio_set_max_poll_reqs (unsigned int maxreqs) { etp_set_max_poll_reqs (maxreqs); } -void eio_set_max_idle (unsigned int nthreads) +void ecb_cold +eio_set_max_idle (unsigned int nthreads) { etp_set_max_idle (nthreads); } -void eio_set_idle_timeout (unsigned int seconds) +void ecb_cold +eio_set_idle_timeout (unsigned int seconds) { etp_set_idle_timeout (seconds); } -void eio_set_min_parallel (unsigned int nthreads) +void ecb_cold +eio_set_min_parallel (unsigned int nthreads) { etp_set_min_parallel (nthreads); } -void eio_set_max_parallel (unsigned int nthreads) +void ecb_cold +eio_set_max_parallel (unsigned int nthreads) { etp_set_max_parallel (nthreads); } @@ -865,7 +904,8 @@ # undef futimes # define futimes(fd,times) eio__futimes (fd, times) -static int eio__futimes (int fd, const struct timeval tv[2]) +static int +eio__futimes (int fd, const struct timeval tv[2]) { errno = ENOSYS; return -1; @@ -879,7 +919,7 @@ #endif /* sync_file_range always needs emulation */ -int +static int eio__sync_file_range (int fd, off_t offset, size_t nbytes, unsigned int flags) { #if HAVE_SYNC_FILE_RANGE @@ -935,84 +975,104 @@ static ssize_t eio__sendfile (int ofd, int ifd, off_t offset, size_t count, etp_worker *self) { + ssize_t written = 0; ssize_t res; if (!count) return 0; + for (;;) + { #if HAVE_SENDFILE # if __linux - res = sendfile (ofd, ifd, &offset, count); + off_t soffset = offset; + res = sendfile (ofd, ifd, &soffset, count); # elif __FreeBSD__ - /* - * Of course, the freebsd sendfile is a dire hack with no thoughts - * wasted on making it similar to other I/O functions. - */ - { - off_t sbytes; - res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); - - #if 0 /* according to the manpage, this is correct, but broken behaviour */ - /* freebsd' sendfile will return 0 on success */ - /* freebsd 8 documents it as only setting *sbytes on EINTR and EAGAIN, but */ - /* not on e.g. EIO or EPIPE - sounds broken */ - if ((res < 0 && (errno == EAGAIN || errno == EINTR) && sbytes) || res == 0) - res = sbytes; - #endif - - /* according to source inspection, this is correct, and useful behaviour */ - if (sbytes) - res = sbytes; - } + /* + * Of course, the freebsd sendfile is a dire hack with no thoughts + * wasted on making it similar to other I/O functions. + */ + off_t sbytes; + res = sendfile (ifd, ofd, offset, count, 0, &sbytes, 0); + + #if 0 /* according to the manpage, this is correct, but broken behaviour */ + /* freebsd' sendfile will return 0 on success */ + /* freebsd 8 documents it as only setting *sbytes on EINTR and EAGAIN, but */ + /* not on e.g. EIO or EPIPE - sounds broken */ + if ((res < 0 && (errno == EAGAIN || errno == EINTR) && sbytes) || res == 0) + res = sbytes; + #endif + + /* according to source inspection, this is correct, and useful behaviour */ + if (sbytes) + res = sbytes; # elif defined (__APPLE__) + off_t sbytes = count; + res = sendfile (ifd, ofd, offset, &sbytes, 0, 0); - { - off_t sbytes = count; - res = sendfile (ifd, ofd, offset, &sbytes, 0, 0); - - /* according to the manpage, sbytes is always valid */ - if (sbytes) - res = sbytes; - } + /* according to the manpage, sbytes is always valid */ + if (sbytes) + res = sbytes; # elif __hpux - res = sendfile (ofd, ifd, offset, count, 0, 0); + res = sendfile (ofd, ifd, offset, count, 0, 0); # elif __solaris - { - struct sendfilevec vec; - size_t sbytes; + struct sendfilevec vec; + size_t sbytes; - vec.sfv_fd = ifd; - vec.sfv_flag = 0; - vec.sfv_off = offset; - vec.sfv_len = count; + vec.sfv_fd = ifd; + vec.sfv_flag = 0; + vec.sfv_off = offset; + vec.sfv_len = count; - res = sendfilev (ofd, &vec, 1, &sbytes); + res = sendfilev (ofd, &vec, 1, &sbytes); - if (res < 0 && sbytes) - res = sbytes; - } + if (res < 0 && sbytes) + res = sbytes; # endif #elif defined (_WIN32) - - /* does not work, just for documentation of what would need to be done */ - { - HANDLE h = TO_SOCKET (ifd); - SetFilePointer (h, offset, 0, FILE_BEGIN); - res = TransmitFile (TO_SOCKET (ofd), h, count, 0, 0, 0, 0); - } + /* does not work, just for documentation of what would need to be done */ + /* actually, cannot be done like this, as TransmitFile changes the file offset, */ + /* libeio guarantees that the file offset does not change, and windows */ + /* has no way to get an independent handle to the same file description */ + HANDLE h = TO_SOCKET (ifd); + SetFilePointer (h, offset, 0, FILE_BEGIN); + res = TransmitFile (TO_SOCKET (ofd), h, count, 0, 0, 0, 0); #else - res = -1; - errno = ENOSYS; + res = -1; + errno = ENOSYS; #endif - if (res < 0 + /* we assume sendfile can copy at least 128mb in one go */ + if (res <= 128 * 1024 * 1024) + { + if (res > 0) + written += res; + + if (written) + return written; + + break; + } + else + { + /* if we requested more, then probably the kernel was lazy */ + written += res; + offset += res; + count -= res; + + if (!count) + return written; + } + } + + if (res < 0 && (errno == ENOSYS || errno == EINVAL || errno == ENOTSOCK /* BSDs */ #ifdef ENOTSUP /* sigh, if the steenking pile called openbsd would only try to at least compile posix code... */ @@ -1248,6 +1308,9 @@ /* the corresponding closedir is in ETP_WORKER_CLEAR */ self->dirp = dirp = opendir (req->ptr1); + if (req->flags & EIO_FLAG_PTR1_FREE) + free (req->ptr1); + req->flags |= EIO_FLAG_PTR1_FREE | EIO_FLAG_PTR2_FREE; req->ptr1 = dents = flags ? malloc (dentalloc * sizeof (eio_dirent)) : 0; req->ptr2 = names = malloc (namesalloc); @@ -1281,7 +1344,6 @@ /* now partition dirs to the front, and non-dirs to the back */ /* by walking from both sides and swapping if necessary */ - /* also clear score, so it doesn't influence sorting */ while (oth > dir) { if (dir->type == EIO_DT_DIR) @@ -1294,7 +1356,7 @@ } } - /* now sort the dirs only */ + /* now sort the dirs only (dirs all have the same score) */ eio_dent_sort (dents, dir - dents, 0, inode_bits); } @@ -1309,7 +1371,7 @@ { int len = D_NAMLEN (entp) + 1; - while (expect_false (namesoffs + len > namesalloc)) + while (ecb_expect_false (namesoffs + len > namesalloc)) { namesalloc *= 2; X_LOCK (wrklock); @@ -1326,7 +1388,7 @@ { struct eio_dirent *ent; - if (expect_false (dentoffs == dentalloc)) + if (ecb_expect_false (dentoffs == dentalloc)) { dentalloc *= 2; X_LOCK (wrklock); @@ -1496,7 +1558,7 @@ # define eio__msync(a,b,c) ((errno = ENOSYS), -1) #else -int +static int eio__msync (void *mem, size_t len, int flags) { eio_page_align (&mem, &len); @@ -1516,9 +1578,13 @@ #endif -int -eio__mtouch (void *mem, size_t len, int flags) +static int +eio__mtouch (eio_req *req) { + void *mem = req->ptr2; + size_t len = req->size; + int flags = req->int1; + eio_page_align (&mem, &len); { @@ -1528,9 +1594,9 @@ if (addr < end) if (flags & EIO_MT_MODIFY) /* modify */ - do { *((volatile sig_atomic_t *)addr) |= 0; } while ((addr += page) < len); + do { *((volatile sig_atomic_t *)addr) |= 0; } while ((addr += page) < len && !EIO_CANCELLED (req)); else - do { *((volatile sig_atomic_t *)addr) ; } while ((addr += page) < len); + do { *((volatile sig_atomic_t *)addr) ; } while ((addr += page) < len && !EIO_CANCELLED (req)); } return 0; @@ -1628,12 +1694,14 @@ /*****************************************************************************/ -int eio_init (void (*want_poll)(void), void (*done_poll)(void)) +int ecb_cold +eio_init (void (*want_poll)(void), void (*done_poll)(void)) { return etp_init (want_poll, done_poll); } -static void eio_api_destroy (eio_req *req) +ecb_inline void +eio_api_destroy (eio_req *req) { free (req); } @@ -1662,7 +1730,8 @@ return 0; \ } -static void eio_execute (etp_worker *self, eio_req *req) +static void +eio_execute (etp_worker *self, eio_req *req) { switch (req->type) { @@ -1714,7 +1783,7 @@ case EIO_FSYNC: req->result = fsync (req->int1); break; case EIO_FDATASYNC: req->result = fdatasync (req->int1); break; case EIO_MSYNC: req->result = eio__msync (req->ptr2, req->size, req->int1); break; - case EIO_MTOUCH: req->result = eio__mtouch (req->ptr2, req->size, req->int1); break; + case EIO_MTOUCH: req->result = eio__mtouch (req); break; case EIO_MLOCK: req->result = eio__mlock (req->ptr2, req->size); break; case EIO_MLOCKALL: req->result = eio__mlockall (req->int1); break; case EIO_SYNC_FILE_RANGE: req->result = eio__sync_file_range (req->int1, req->offs, req->size, req->int2); break; @@ -1768,7 +1837,7 @@ break; case EIO_CUSTOM: - ((void (*)(eio_req *))req->feed) (req); + req->feed (req); break; default: @@ -1999,9 +2068,9 @@ return eio__2path (EIO_RENAME, path, new_path, pri, cb, data); } -eio_req *eio_custom (eio_cb execute, int pri, eio_cb cb, void *data) +eio_req *eio_custom (void (*execute)(eio_req *), int pri, eio_cb cb, void *data) { - REQ (EIO_CUSTOM); req->feed = (void (*)(eio_req *))execute; SEND; + REQ (EIO_CUSTOM); req->feed = execute; SEND; } #endif @@ -2020,7 +2089,8 @@ /*****************************************************************************/ /* grp functions */ -void eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit) +void +eio_grp_feed (eio_req *grp, void (*feed)(eio_req *req), int limit) { grp->int2 = limit; grp->feed = feed; @@ -2028,14 +2098,16 @@ grp_try_feed (grp); } -void eio_grp_limit (eio_req *grp, int limit) +void +eio_grp_limit (eio_req *grp, int limit) { grp->int2 = limit; grp_try_feed (grp); } -void eio_grp_add (eio_req *grp, eio_req *req) +void +eio_grp_add (eio_req *grp, eio_req *req) { assert (("cannot add requests to IO::AIO::GRP after the group finished", grp->int1 != 2)); @@ -2056,7 +2128,8 @@ /*****************************************************************************/ /* misc garbage */ -ssize_t eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count) +ssize_t +eio_sendfile_sync (int ofd, int ifd, off_t offset, size_t count) { etp_worker wrk; ssize_t ret;