--- libev/ev_linuxaio.c 2019/06/20 23:59:30 1.3 +++ libev/ev_linuxaio.c 2019/06/25 06:43:04 1.29 @@ -37,26 +37,70 @@ * either the BSD or the GPL. */ +/* + * general notes about linux aio: + * + * a) at first, the linux aio IOCB_CMD_POLL functionality introduced in + * 4.18 looks too good to be true: both watchers and events can be + * batched, and events can even be handled in userspace using + * a ring buffer shared with the kernel. watchers can be canceled + * regardless of whether the fd has been closed. no problems with fork. + * ok, the ring buffer is 200% undocumented (there isn't even a + * header file), but otherwise, it's pure bliss! + * b) ok, watchers are one-shot, so you have to re-arm active ones + * on every iteration. so much for syscall-less event handling, + * but at least these re-arms can be batched, no big deal, right? + * c) well, linux as usual: the documentation lies to you: io_submit + * sometimes returns EINVAL because the kernel doesn't feel like + * handling your poll mask - ttys can be polled for POLLOUT, + * POLLOUT|POLLIN, but polling for POLLIN fails. just great, + * so we have to fall back to something else (hello, epoll), + * but at least the fallback can be slow, because these are + * exceptional cases, right? + * d) hmm, you have to tell the kernel the maximum number of watchers + * you want to queue when initialising the aio context. but of + * course the real limit is magically calculated in the kernel, and + * is often higher then we asked for. so we just have to destroy + * the aio context and re-create it a bit larger if we hit the limit. + * (starts to remind you of epoll? well, it's a bit more deterministic + * and less gambling, but still ugly as hell). + * e) that's when you find out you can also hit an arbitrary system-wide + * limit. or the kernel simply doesn't want to handle your watchers. + * what the fuck do we do then? you guessed it, in the middle + * of event handling we have to switch to 100% epoll polling. and + * that better is as fast as normal epoll polling, so you practically + * have to use the normal epoll backend with all its quirks. + * f) end result of this train wreck: it inherits all the disadvantages + * from epoll, while adding a number on its own. why even bother to use + * it? because if conditions are right and your fds are supported and you + * don't hit a limit, this backend is actually faster, doesn't gamble with + * your fds, batches watchers and events and doesn't require costly state + * recreates. well, until it does. + * g) all of this makes this backend use almost twice as much code as epoll. + * which in turn uses twice as much code as poll. and that#s not counting + * the fact that this backend also depends on the epoll backend, making + * it three times as much code as poll, or kqueue. + * h) bleah. why can't linux just do kqueue. sure kqueue is ugly, but by now + * it's clear that whatever linux comes up with is far, far, far worse. + */ + #include /* actually linux/time.h, but we must assume they are compatible */ #include #include -/* we try to fill 4kn pages exactly. - * the ring buffer header is 32 bytes, every io event is 32 bytes. - * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer - * so the calculation below will use "exactly" 8kB for the ring buffer - */ -#define EV_LINUXAIO_DEPTH (256 / 2 - 2 - 1) /* max. number of io events per batch */ - /*****************************************************************************/ -/* syscall wrapdadoop */ +/* syscall wrapdadoop - this section has the raw api/abi definitions */ #include /* no glibc wrappers */ -/* aio_abi.h is not verioned in any way, so we cannot test for its existance */ +/* aio_abi.h is not versioned in any way, so we cannot test for its existance */ #define IOCB_CMD_POLL 5 -/* taken from linux/fs/aio.c */ +/* taken from linux/fs/aio.c. yup, that's a .c file. + * not only is this totally undocumented, not even the source code + * can tell you what the future semantics of compat_features and + * incompat_features are, or what header_length actually is for. + */ #define AIO_RING_MAGIC 0xa10a10a1 #define AIO_RING_INCOMPAT_FEATURES 0 struct aio_ring @@ -74,42 +118,81 @@ struct io_event io_events[0]; }; -static int -ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp) +inline_size +int +evsys_io_setup (unsigned nr_events, aio_context_t *ctx_idp) { return syscall (SYS_io_setup, nr_events, ctx_idp); } -static int -ev_io_destroy (aio_context_t ctx_id) +inline_size +int +evsys_io_destroy (aio_context_t ctx_id) { return syscall (SYS_io_destroy, ctx_id); } -static int -ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) +inline_size +int +evsys_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) { return syscall (SYS_io_submit, ctx_id, nr, cbp); } -static int -ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) +inline_size +int +evsys_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) { return syscall (SYS_io_cancel, ctx_id, cbp, result); } -static int -ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) +inline_size +int +evsys_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) { return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); } -typedef void (*ev_io_cb) (long nr, struct io_event *events); - /*****************************************************************************/ /* actual backed implementation */ -/* two iocbs for every fd, one for read, one for write */ +ecb_cold +static int +linuxaio_nr_events (EV_P) +{ + /* we start with 16 iocbs and incraese from there + * that's tiny, but the kernel has a rather low system-wide + * limit that can be reached quickly, so let's be parsimonious + * with this resource. + * Rest assured, the kernel generously rounds up small and big numbers + * in different ways (but doesn't seem to charge you for it). + * The 15 here is because the kernel usually has a power of two as aio-max-nr, + * and this helps to take advantage of that limit. + */ + + /* we try to fill 4kB pages exactly. + * the ring buffer header is 32 bytes, every io event is 32 bytes. + * the kernel takes the io requests number, doubles it, adds 2 + * and adds the ring buffer. + * the way we use this is by starting low, and then roughly doubling the + * size each time we hit a limit. + */ + + int requests = 15 << linuxaio_iteration; + int one_page = (4096 + / sizeof (struct io_event) ) / 2; /* how many fit into one page */ + int first_page = ((4096 - sizeof (struct aio_ring)) + / sizeof (struct io_event) - 2) / 2; /* how many fit into the first page */ + + /* if everything fits into one page, use count exactly */ + if (requests > first_page) + /* otherwise, round down to full pages and add the first page */ + requests = requests / one_page * one_page + first_page; + + return requests; +} + +/* we use out own wrapper structure in case we ever want to do something "clever" */ typedef struct aniocb { struct iocb io; @@ -118,45 +201,57 @@ inline_size void -linuxaio_array_needsize_iocbp (ANIOCBP *base, int count) +linuxaio_array_needsize_iocbp (ANIOCBP *base, int offset, int count) { - /* TODO: quite the overhead to allocate every iocb separately */ while (count--) { - *base = (ANIOCBP)ev_malloc (sizeof (**base)); - /* TODO: full zero initialize required? */ - memset (*base, 0, sizeof (**base)); - /* would be nice to initialize fd/data as well */ - (*base)->io.aio_lio_opcode = IOCB_CMD_POLL; - ++base; + /* TODO: quite the overhead to allocate every iocb separately, maybe use our own allocator? */ + ANIOCBP iocb = (ANIOCBP)ev_malloc (sizeof (*iocb)); + + /* full zero initialise is probably not required at the moment, but + * this is not well documented, so we better do it. + */ + memset (iocb, 0, sizeof (*iocb)); + + iocb->io.aio_lio_opcode = IOCB_CMD_POLL; + iocb->io.aio_data = offset; + iocb->io.aio_fildes = offset; + + base [offset++] = iocb; } } +ecb_cold static void linuxaio_free_iocbp (EV_P) { while (linuxaio_iocbpmax--) ev_free (linuxaio_iocbps [linuxaio_iocbpmax]); - /* next resize will completely reallocate the array */ - linuxaio_iocbpmax = 0; - linuxaio_submitcnt = 0; /* all pointers invalidated */ + linuxaio_iocbpmax = 0; /* next resize will completely reallocate the array, at some overhead */ } static void linuxaio_modify (EV_P_ int fd, int oev, int nev) { array_needsize (ANIOCBP, linuxaio_iocbps, linuxaio_iocbpmax, fd + 1, linuxaio_array_needsize_iocbp); - struct aniocb *iocb = linuxaio_iocbps [fd]; + ANIOCBP iocb = linuxaio_iocbps [fd]; + + if (iocb->io.aio_reqprio < 0) + { + /* we handed this fd over to epoll, so undo this first */ + /* we do it manually because the optimisations on epoll_modfy won't do us any good */ + epoll_ctl (backend_fd, EPOLL_CTL_DEL, fd, 0); + iocb->io.aio_reqprio = 0; + } if (iocb->io.aio_buf) - ev_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0); /* always fails in relevant kernels */ + /* io_cancel always returns some error on relevant kernels, but works */ + evsys_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0); if (nev) { - iocb->io.aio_data = fd; - iocb->io.aio_fildes = fd; - iocb->io.aio_buf = + iocb->io.aio_buf = (nev & EV_READ ? POLLIN : 0) | (nev & EV_WRITE ? POLLOUT : 0); @@ -169,6 +264,20 @@ } static void +linuxaio_epoll_cb (EV_P_ struct ev_io *w, int revents) +{ + epoll_poll (EV_A_ 0); +} + +static void +linuxaio_fd_rearm (EV_P_ int fd) +{ + anfds [fd].events = 0; + linuxaio_iocbps [fd]->io.aio_buf = 0; + fd_change (EV_A_ fd, EV_ANFD_REIFY); +} + +static void linuxaio_parse_events (EV_P_ struct io_event *ev, int nr) { while (nr) @@ -178,55 +287,58 @@ assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax)); - /* linux aio is oneshot: rearm fd */ - linuxaio_iocbps [fd]->io.aio_buf = 0; - anfds [fd].events = 0; - fd_change (EV_A_ fd, 0); - /* feed events, we do not expect or handle POLLNVAL */ - if (ecb_expect_false (res & POLLNVAL)) - fd_kill (EV_A_ fd); - else - fd_event ( - EV_A_ - fd, - (res & (POLLOUT | POLLERR | POLLHUP) ? EV_WRITE : 0) - | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0) - ); + fd_event ( + EV_A_ + fd, + (res & (POLLOUT | POLLERR | POLLHUP) ? EV_WRITE : 0) + | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0) + ); + + /* linux aio is oneshot: rearm fd. TODO: this does more work than needed */ + linuxaio_fd_rearm (EV_A_ fd); --nr; ++ev; } } -/* get any events from ringbuffer, return true if any were handled */ +/* get any events from ring buffer, return true if any were handled */ static int linuxaio_get_events_from_ring (EV_P) { struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx; - ECB_MEMORY_FENCE_ACQUIRE; - - unsigned head = ring->head; + /* the kernel reads and writes both of these variables, */ + /* as a C extension, we assume that volatile use here */ + /* both makes reads atomic and once-only */ + unsigned head = *(volatile unsigned *)&ring->head; unsigned tail = *(volatile unsigned *)&ring->tail; - if (ring->magic != AIO_RING_MAGIC - || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES - || ring->header_length != sizeof (struct aio_ring) /* TODO: or use it to find io_event[0]? */ - || head == tail) + if (head == tail) return 0; + /* bail out if the ring buffer doesn't match the expected layout */ + if (expect_false (ring->magic != AIO_RING_MAGIC) + || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES + || ring->header_length != sizeof (struct aio_ring)) /* TODO: or use it to find io_event[0]? */ + return 0; + + /* make sure the events up to tail are visible */ + ECB_MEMORY_FENCE_ACQUIRE; + /* parse all available events, but only once, to avoid starvation */ if (tail > head) /* normal case around */ linuxaio_parse_events (EV_A_ ring->io_events + head, tail - head); - else + else /* wrapped around */ { - /* wrapped around */ linuxaio_parse_events (EV_A_ ring->io_events + head, ring->nr - head); linuxaio_parse_events (EV_A_ ring->io_events, tail); } - ring->head = tail; + ECB_MEMORY_FENCE_RELEASE; + /* as an extension to C, we hope that the volatile will make this atomic and once-only */ + *(volatile unsigned *)&ring->head = tail; return 1; } @@ -237,31 +349,45 @@ linuxaio_get_events (EV_P_ ev_tstamp timeout) { struct timespec ts; - struct io_event ioev; + struct io_event ioev[1]; int res; if (linuxaio_get_events_from_ring (EV_A)) return; /* no events, so wait for at least one, then poll ring buffer again */ - /* this degraded to one event per loop iteration */ + /* this degrades to one event per loop iteration */ /* if the ring buffer changes layout, but so be it */ + EV_RELEASE_CB; + ts.tv_sec = (long)timeout; ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); - res = ev_io_getevents (linuxaio_ctx, 1, 1, &ioev, &ts); + res = evsys_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts); + + EV_ACQUIRE_CB; if (res < 0) - ev_syserr ("(libev) io_getevents"); + if (errno == EINTR) + /* ignored */; + else + ev_syserr ("(libev) linuxaio io_getevents"); else if (res) { /* at least one event received, handle it and any remaining ones in the ring buffer */ - linuxaio_parse_events (EV_A_ &ioev, 1); + linuxaio_parse_events (EV_A_ ioev, res); linuxaio_get_events_from_ring (EV_A); } } +static int +linuxaio_io_setup (EV_P) +{ + linuxaio_ctx = 0; + return evsys_io_setup (linuxaio_nr_events (EV_A), &linuxaio_ctx); +} + static void linuxaio_poll (EV_P_ ev_tstamp timeout) { @@ -271,28 +397,72 @@ /* io_submit might return less than the requested number of iocbs */ /* this is, afaics, only because of errors, but we go by the book and use a loop, */ - /* which allows us to pinpoint the errornous iocb */ + /* which allows us to pinpoint the erroneous iocb */ for (submitted = 0; submitted < linuxaio_submitcnt; ) { - int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); + int res = evsys_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); - if (res < 0) - if (errno == EAGAIN) + if (expect_false (res < 0)) + if (errno == EINVAL) { - /* This happens when the ring buffer is full, at least. I assume this means - * that the event was queued synchronously during io_submit, and thus - * the buffer overflowd. - * In this case, we just try next loop iteration. + /* This happens for unsupported fds, officially, but in my testing, + * also randomly happens for supported fds. We fall back to good old + * poll() here, under the assumption that this is a very rare case. + * See https://lore.kernel.org/patchwork/patch/1047453/ to see + * discussion about such a case (ttys) where polling for POLLIN + * fails but POLLIN|POLLOUT works. */ - memcpy (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); - linuxaio_submitcnt -= submitted; + struct iocb *iocb = linuxaio_submits [submitted]; + epoll_modify (EV_A_ iocb->aio_fildes, 0, anfds [iocb->aio_fildes].events); + iocb->aio_reqprio = -1; /* mark iocb as epoll */ + + res = 1; /* skip this iocb - another iocb, another chance */ + } + else if (errno == EAGAIN) + { + /* This happens when the ring buffer is full, or some other shit we + * don't know and isn't documented. Most likely because we have too + * many requests and linux aio can't be assed to handle them. + * In this case, we try to allocate a larger ring buffer, freeing + * ours first. This might fail, in which case we have to fall back to 100% + * epoll. + * God, how I hate linux not getting its act together. Ever. + */ + evsys_io_destroy (linuxaio_ctx); + linuxaio_submitcnt = 0; + + /* rearm all fds with active iocbs */ + { + int fd; + for (fd = 0; fd < linuxaio_iocbpmax; ++fd) + if (linuxaio_iocbps [fd]->io.aio_buf) + linuxaio_fd_rearm (EV_A_ fd); + } + + ++linuxaio_iteration; + if (linuxaio_io_setup (EV_A) < 0) + { + /* to bad, we can't get a new aio context, go 100% epoll */ + linuxaio_free_iocbp (EV_A); + ev_io_stop (EV_A_ &linuxaio_epoll_w); + ev_ref (EV_A); + linuxaio_ctx = 0; + backend_modify = epoll_modify; + backend_poll = epoll_poll; + } + timeout = 0; - break; + /* it's easiest to handle this mess in another iteration */ + return; + } + else if (errno == EBADF) + { + fd_kill (EV_A_ linuxaio_submits [submitted]->aio_fildes); + + res = 1; /* skip this iocb */ } else - /* TODO: we get EAGAIN when the ring buffer is full for some reason */ - /* TODO: should we always just try next time? */ - ev_syserr ("(libev) io_submit"); + ev_syserr ("(libev) linuxaio io_submit"); submitted += res; } @@ -310,13 +480,26 @@ { /* would be great to have a nice test for IOCB_CMD_POLL instead */ /* also: test some semi-common fd types, such as files and ttys in recommended_backends */ - if (ev_linux_version () < 0x041200) /* 4.18 introduced IOCB_CMD_POLL */ + /* 4.18 introduced IOCB_CMD_POLL, 4.19 made epoll work, and we need that */ + if (ev_linux_version () < 0x041300) return 0; - linuxaio_ctx = 0; - if (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) + if (!epoll_init (EV_A_ 0)) return 0; + linuxaio_iteration = 0; + + if (linuxaio_io_setup (EV_A) < 0) + { + epoll_destroy (EV_A); + return 0; + } + + ev_io_init (EV_A_ &linuxaio_epoll_w, linuxaio_epoll_cb, backend_fd, EV_READ); + ev_set_priority (&linuxaio_epoll_w, EV_MAXPRI); + ev_io_start (EV_A_ &linuxaio_epoll_w); + ev_unref (EV_A); /* watcher should not keep loop alive */ + backend_modify = linuxaio_modify; backend_poll = linuxaio_poll; @@ -334,21 +517,31 @@ void linuxaio_destroy (EV_P) { + epoll_destroy (EV_A); linuxaio_free_iocbp (EV_A); - ev_io_destroy (linuxaio_ctx); + evsys_io_destroy (linuxaio_ctx); } inline_size void linuxaio_fork (EV_P) { - /* TODO: verify and test */ + /* this frees all iocbs, which is very heavy-handed */ linuxaio_destroy (EV_A); + linuxaio_submitcnt = 0; /* all pointers were invalidated */ - linuxaio_ctx = 0; - while (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) - ev_syserr ("(libev) io_setup"); + linuxaio_iteration = 0; /* we start over in the child */ + + while (linuxaio_io_setup (EV_A) < 0) + ev_syserr ("(libev) linuxaio io_setup"); + + epoll_fork (EV_A); + + ev_io_stop (EV_A_ &linuxaio_epoll_w); + ev_io_set (EV_A_ &linuxaio_epoll_w, backend_fd, EV_READ); + ev_io_start (EV_A_ &linuxaio_epoll_w); - fd_rearm_all (EV_A); + /* epoll_fork already did this. hopefully */ + /*fd_rearm_all (EV_A);*/ }