--- libev/ev_linuxaio.c 2019/06/24 04:54:08 1.18 +++ libev/ev_linuxaio.c 2019/06/24 22:27:29 1.24 @@ -55,7 +55,7 @@ #define EV_LINUXAIO_DEPTH (128 / 2 - 2 - 1) /* max. number of io events per batch */ /*****************************************************************************/ -/* syscall wrapdadoop */ +/* syscall wrapdadoop - this section has the raw syscall definitions */ #include /* no glibc wrappers */ @@ -82,35 +82,35 @@ inline_size int -ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp) +evsys_io_setup (unsigned nr_events, aio_context_t *ctx_idp) { return syscall (SYS_io_setup, nr_events, ctx_idp); } inline_size int -ev_io_destroy (aio_context_t ctx_id) +evsys_io_destroy (aio_context_t ctx_id) { return syscall (SYS_io_destroy, ctx_id); } inline_size int -ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) +evsys_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) { return syscall (SYS_io_submit, ctx_id, nr, cbp); } inline_size int -ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) +evsys_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) { return syscall (SYS_io_cancel, ctx_id, cbp, result); } inline_size int -ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) +evsys_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) { return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); } @@ -127,17 +127,23 @@ inline_size void -linuxaio_array_needsize_iocbp (ANIOCBP *base, int count) +linuxaio_array_needsize_iocbp (ANIOCBP *base, int offset, int count) { - /* TODO: quite the overhead to allocate every iocb separately, maybe use our own alocator? */ while (count--) { - *base = (ANIOCBP)ev_malloc (sizeof (**base)); - /* TODO: full zero initialize required? */ - memset (*base, 0, sizeof (**base)); - /* would be nice to initialize fd/data as well, but array_needsize API doesn't support that */ - (*base)->io.aio_lio_opcode = IOCB_CMD_POLL; - ++base; + /* TODO: quite the overhead to allocate every iocb separately, maybe use our own alocator? */ + ANIOCBP iocb = (ANIOCBP)ev_malloc (sizeof (*iocb)); + + /* full zero initialise is probably not required at the moment, but + * this is not well documented, so we better do it. + */ + memset (iocb, 0, sizeof (*iocb)); + + iocb->io.aio_lio_opcode = IOCB_CMD_POLL; + iocb->io.aio_data = offset; + iocb->io.aio_fildes = offset; + + base [offset++] = iocb; } } @@ -155,7 +161,7 @@ linuxaio_modify (EV_P_ int fd, int oev, int nev) { array_needsize (ANIOCBP, linuxaio_iocbps, linuxaio_iocbpmax, fd + 1, linuxaio_array_needsize_iocbp); - struct aniocb *iocb = linuxaio_iocbps [fd]; + ANIOCBP iocb = linuxaio_iocbps [fd]; #if EPOLL_FALLBACK if (iocb->io.aio_reqprio < 0) @@ -166,13 +172,11 @@ #endif if (iocb->io.aio_buf) - ev_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0); /* always returns an error relevant kernels */ + evsys_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0); /* always returns an error relevant kernels */ if (nev) { - iocb->io.aio_data = fd; - iocb->io.aio_fildes = fd; - iocb->io.aio_buf = + iocb->io.aio_buf = (nev & EV_READ ? POLLIN : 0) | (nev & EV_WRITE ? POLLOUT : 0); @@ -184,6 +188,58 @@ } } +#if EPOLL_FALLBACK + +static void +linuxaio_rearm_epoll (EV_P_ struct iocb *iocb, int op) +{ + struct epoll_event eev; + + eev.events = EPOLLONESHOT; + if (iocb->aio_buf & POLLIN ) eev.events |= EPOLLIN ; + if (iocb->aio_buf & POLLOUT) eev.events |= EPOLLOUT; + eev.data.fd = iocb->aio_fildes; + + if (epoll_ctl (backend_fd, op, iocb->aio_fildes, &eev) < 0) + ev_syserr ("(libeio) linuxaio epoll_ctl"); +} + +static void +linuxaio_epoll_cb (EV_P_ struct ev_io *w, int revents) +{ + struct epoll_event events[16]; + + for (;;) + { + int idx; + int res = epoll_wait (backend_fd, events, sizeof (events) / sizeof (events [0]), 0); + + if (expect_false (res < 0)) + ev_syserr ("(libev) linuxaio epoll_wait"); + else if (!res) + break; + + for (idx = res; idx--; ) + { + int fd = events [idx].data.fd; + uint32_t ev = events [idx].events; + + assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax)); + + linuxaio_rearm_epoll (EV_A_ &linuxaio_iocbps [fd]->io, EPOLL_CTL_MOD); + + fd_event (EV_A_ fd, + (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0) + | (ev & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0)); + } + + if (res < sizeof (events) / sizeof (events [0])) + break; + } +} + +#endif + static void linuxaio_parse_events (EV_P_ struct io_event *ev, int nr) { @@ -194,21 +250,18 @@ assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax)); - /* linux aio is oneshot: rearm fd */ + /* linux aio is oneshot: rearm fd. TODO: this does more work than needed */ linuxaio_iocbps [fd]->io.aio_buf = 0; anfds [fd].events = 0; fd_change (EV_A_ fd, 0); /* feed events, we do not expect or handle POLLNVAL */ - if (expect_false (res & POLLNVAL)) - fd_kill (EV_A_ fd); - else - fd_event ( - EV_A_ - fd, - (res & (POLLOUT | POLLERR | POLLHUP) ? EV_WRITE : 0) - | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0) - ); + fd_event ( + EV_A_ + fd, + (res & (POLLOUT | POLLERR | POLLHUP) ? EV_WRITE : 0) + | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0) + ); --nr; ++ev; @@ -248,8 +301,7 @@ linuxaio_parse_events (EV_A_ ring->io_events, tail); } - /* TODO: we only need a compiler barrier here, not a read fence */ - ECB_MEMORY_FENCE_RELEASE; + ECB_MEMORY_FENCE_RELAXED; /* as an extension to C, we hope that the volatile will make this atomic and once-only */ *(volatile unsigned *)&ring->head = tail; /* make sure kernel can see our new head value - probably not required */ @@ -274,10 +326,14 @@ /* this degrades to one event per loop iteration */ /* if the ring buffer changes layout, but so be it */ + EV_RELEASE_CB; + ts.tv_sec = (long)timeout; ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); - res = ev_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts); + res = evsys_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts); + + EV_ACQUIRE_CB; if (res < 0) if (errno == EINTR) @@ -292,22 +348,6 @@ } } -#if EPOLL_FALLBACK -static void -linuxaio_rearm_epoll (EV_P_ struct iocb *iocb, int op) -{ - struct epoll_event eev; - - eev.events = EPOLLONESHOT; - if (iocb->aio_buf & POLLIN ) eev.events |= EPOLLIN ; - if (iocb->aio_buf & POLLOUT) eev.events |= EPOLLOUT; - eev.data.fd = iocb->aio_fildes; - - if (epoll_ctl (backend_fd, op, iocb->aio_fildes, &eev) < 0) - ev_syserr ("(libeio) linuxaio epoll_ctl"); -} -#endif - static void linuxaio_poll (EV_P_ ev_tstamp timeout) { @@ -320,7 +360,15 @@ /* which allows us to pinpoint the errornous iocb */ for (submitted = 0; submitted < linuxaio_submitcnt; ) { - int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); +#if 0 + int res; + if (linuxaio_submits[submitted]->aio_fildes == backend_fd) + res = evsys_io_submit (linuxaio_ctx, 1, linuxaio_submits + submitted); + else + { res = -1; errno = EINVAL; }; +#else + int res = evsys_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); +#endif if (expect_false (res < 0)) if (errno == EAGAIN) @@ -331,9 +379,14 @@ * In this case, we just try in next loop iteration. * This should not result in a few fds taking priority, as the interface * is one-shot, and we submit iocb's in a round-robin fashion. + * TODO: maybe make "submitted" persistent, so we don't have to memmove? */ - memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); - linuxaio_submitcnt -= submitted; + if (ecb_expect_false (submitted)) + { + memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); + linuxaio_submitcnt -= submitted; + } + timeout = 0; break; } @@ -343,16 +396,24 @@ /* This happens for unsupported fds, officially, but in my testing, * also randomly happens for supported fds. We fall back to good old * poll() here, under the assumption that this is a very rare case. - * See https://lore.kernel.org/patchwork/patch/1047453/ for evidence - * that the problem is known, but ignored. + * See https://lore.kernel.org/patchwork/patch/1047453/ to see + * discussion about such a case (ttys) where polling for POLLIN + * fails but POLLIN|POLLOUT works. */ struct iocb *iocb = linuxaio_submits [submitted]; - res = 1; /* skip this iocb */ - linuxaio_rearm_epoll (EV_A_ iocb, EPOLL_CTL_ADD); + linuxaio_rearm_epoll (EV_A_ linuxaio_submits [submitted], EPOLL_CTL_ADD); iocb->aio_reqprio = -1; /* mark iocb as epoll */ + + res = 1; /* skip this iocb */ } #endif + else if (errno == EBADF) + { + fd_kill (EV_A_ linuxaio_submits [submitted]->aio_fildes); + + res = 1; /* skip this iocb */ + } else ev_syserr ("(libev) linuxaio io_submit"); @@ -366,44 +427,6 @@ linuxaio_get_events (EV_A_ timeout); } -#if EPOLL_FALLBACK - -static void -linuxaio_epoll_cb (EV_P_ struct ev_io *w, int revents) -{ - struct epoll_event events[16]; - - for (;;) - { - int idx; - int res = epoll_wait (backend_fd, events, sizeof (events) / sizeof (events [0]), 0); - - if (expect_false (res < 0)) - ev_syserr ("(libev) linuxaio epoll_wait"); - else if (!res) - break; - - for (idx = res; idx--; ) - { - int fd = events [idx].data.fd; - uint32_t ev = events [idx].events; - - assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax)); - - linuxaio_rearm_epoll (EV_A_ &linuxaio_iocbps [fd]->io, EPOLL_CTL_MOD); - - fd_event (EV_A_ fd, - (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0) - | (ev & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0)); - } - - if (res < sizeof (events) / sizeof (events [0])) - break; - } -} - -#endif - inline_size int linuxaio_init (EV_P_ int flags) @@ -421,18 +444,19 @@ #endif linuxaio_ctx = 0; - if (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) + if (evsys_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) return 0; #if EPOLL_FALLBACK backend_fd = ev_epoll_create (); if (backend_fd < 0) { - ev_io_destroy (linuxaio_ctx); + evsys_io_destroy (linuxaio_ctx); return 0; } ev_io_init (EV_A_ &linuxaio_epoll_w, linuxaio_epoll_cb, backend_fd, EV_READ); + ev_set_priority (&linuxaio_epoll_w, EV_MAXPRI); ev_io_start (EV_A_ &linuxaio_epoll_w); ev_unref (EV_A); /* watcher should not keep loop alive */ #endif @@ -458,7 +482,7 @@ close (backend_fd); #endif linuxaio_free_iocbp (EV_A); - ev_io_destroy (linuxaio_ctx); + evsys_io_destroy (linuxaio_ctx); } inline_size @@ -470,7 +494,7 @@ linuxaio_submitcnt = 0; /* all pointers were invalidated */ linuxaio_ctx = 0; - while (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) + while (evsys_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) ev_syserr ("(libev) linuxaio io_setup"); #if EPOLL_FALLBACK