--- libev/ev_linuxaio.c 2019/06/25 06:43:04 1.29 +++ libev/ev_linuxaio.c 2019/07/01 20:35:17 1.40 @@ -102,7 +102,7 @@ * incompat_features are, or what header_length actually is for. */ #define AIO_RING_MAGIC 0xa10a10a1 -#define AIO_RING_INCOMPAT_FEATURES 0 +#define EV_AIO_RING_INCOMPAT_FEATURES 0 struct aio_ring { unsigned id; /* kernel internal index number */ @@ -118,39 +118,90 @@ struct io_event io_events[0]; }; +/* + * define some syscall wrappers for common architectures + * this is mostly for nice looks during debugging, not performance. + * our syscalls return < 0, not == -1, on error. which is good + * enough for linux aio. + * TODO: arm is also common nowadays, maybe even mips and x86 + * TODO: after implementing this, it suddenly looks like overkill, but its hard to remove... + */ +#if __GNUC__ && __linux && ECB_AMD64 && !defined __OPTIMIZE_SIZE__ + /* the costly errno access probably kills this for size optimisation */ + + #define ev_syscall(nr,narg,arg1,arg2,arg3,arg4,arg5) \ + ({ \ + long res; \ + register unsigned long r5 __asm__ ("r8" ); \ + register unsigned long r4 __asm__ ("r10"); \ + register unsigned long r3 __asm__ ("rdx"); \ + register unsigned long r2 __asm__ ("rsi"); \ + register unsigned long r1 __asm__ ("rdi"); \ + if (narg >= 5) r5 = (unsigned long)(arg5); \ + if (narg >= 4) r4 = (unsigned long)(arg4); \ + if (narg >= 3) r3 = (unsigned long)(arg3); \ + if (narg >= 2) r2 = (unsigned long)(arg2); \ + if (narg >= 1) r1 = (unsigned long)(arg1); \ + __asm__ __volatile__ ( \ + "syscall\n\t" \ + : "=a" (res) \ + : "0" (nr), "r" (r1), "r" (r2), "r" (r3), "r" (r4), "r" (r5) \ + : "cc", "r11", "cx", "memory"); \ + errno = -res; \ + res; \ + }) + +#endif + +#ifdef ev_syscall + #define ev_syscall0(nr) ev_syscall (nr, 0, 0, 0, 0, 0, 0 + #define ev_syscall1(nr,arg1) ev_syscall (nr, 1, arg1, 0, 0, 0, 0) + #define ev_syscall2(nr,arg1,arg2) ev_syscall (nr, 2, arg1, arg2, 0, 0, 0) + #define ev_syscall3(nr,arg1,arg2,arg3) ev_syscall (nr, 3, arg1, arg2, arg3, 0, 0) + #define ev_syscall4(nr,arg1,arg2,arg3,arg4) ev_syscall (nr, 3, arg1, arg2, arg3, arg4, 0) + #define ev_syscall5(nr,arg1,arg2,arg3,arg4,arg5) ev_syscall (nr, 5, arg1, arg2, arg3, arg4, arg5) +#else + #define ev_syscall0(nr) syscall (nr) + #define ev_syscall1(nr,arg1) syscall (nr, arg1) + #define ev_syscall2(nr,arg1,arg2) syscall (nr, arg1, arg2) + #define ev_syscall3(nr,arg1,arg2,arg3) syscall (nr, arg1, arg2, arg3) + #define ev_syscall4(nr,arg1,arg2,arg3,arg4) syscall (nr, arg1, arg2, arg3, arg4) + #define ev_syscall5(nr,arg1,arg2,arg3,arg4,arg5) syscall (nr, arg1, arg2, arg3, arg4, arg5) +#endif + inline_size int evsys_io_setup (unsigned nr_events, aio_context_t *ctx_idp) { - return syscall (SYS_io_setup, nr_events, ctx_idp); + return ev_syscall2 (SYS_io_setup, nr_events, ctx_idp); } inline_size int evsys_io_destroy (aio_context_t ctx_id) { - return syscall (SYS_io_destroy, ctx_id); + return ev_syscall1 (SYS_io_destroy, ctx_id); } inline_size int evsys_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) { - return syscall (SYS_io_submit, ctx_id, nr, cbp); + return ev_syscall3 (SYS_io_submit, ctx_id, nr, cbp); } inline_size int evsys_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) { - return syscall (SYS_io_cancel, ctx_id, cbp, result); + return ev_syscall3 (SYS_io_cancel, ctx_id, cbp, result); } inline_size int evsys_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) { - return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); + return ev_syscall5 (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); } /*****************************************************************************/ @@ -240,14 +291,27 @@ if (iocb->io.aio_reqprio < 0) { /* we handed this fd over to epoll, so undo this first */ - /* we do it manually because the optimisations on epoll_modfy won't do us any good */ + /* we do it manually because the optimisations on epoll_modify won't do us any good */ epoll_ctl (backend_fd, EPOLL_CTL_DEL, fd, 0); + anfds [fd].emask = 0; iocb->io.aio_reqprio = 0; } if (iocb->io.aio_buf) - /* io_cancel always returns some error on relevant kernels, but works */ - evsys_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0); + { + for (;;) + { + /* on all relevant kernels, io_cancel fails with EINPROGRESS on "success" */ + if (expect_false (evsys_io_cancel (linuxaio_ctx, &iocb->io, (struct io_event *)0) == 0)) + break; + + if (expect_true (errno == EINPROGRESS)) + break; + + /* the EINPROGRESS test is for nicer error message. clumsy. */ + assert (("libev: linuxaio unexpected io_cancel failed", errno != EINPROGRESS && errno != EINTR)); + } + } if (nev) { @@ -269,7 +333,8 @@ epoll_poll (EV_A_ 0); } -static void +inline_speed +void linuxaio_fd_rearm (EV_P_ int fd) { anfds [fd].events = 0; @@ -295,7 +360,7 @@ | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0) ); - /* linux aio is oneshot: rearm fd. TODO: this does more work than needed */ + /* linux aio is oneshot: rearm fd. TODO: this does more work than strictly needed */ linuxaio_fd_rearm (EV_A_ fd); --nr; @@ -318,12 +383,6 @@ if (head == tail) return 0; - /* bail out if the ring buffer doesn't match the expected layout */ - if (expect_false (ring->magic != AIO_RING_MAGIC) - || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES - || ring->header_length != sizeof (struct aio_ring)) /* TODO: or use it to find io_event[0]? */ - return 0; - /* make sure the events up to tail are visible */ ECB_MEMORY_FENCE_ACQUIRE; @@ -343,45 +402,88 @@ return 1; } +inline_size +int +linuxaio_ringbuf_valid (EV_P) +{ + struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx; + + return expect_true (ring->magic == AIO_RING_MAGIC) + && ring->incompat_features == EV_AIO_RING_INCOMPAT_FEATURES + && ring->header_length == sizeof (struct aio_ring); /* TODO: or use it to find io_event[0]? */ +} + /* read at least one event from kernel, or timeout */ inline_size void linuxaio_get_events (EV_P_ ev_tstamp timeout) { struct timespec ts; - struct io_event ioev[1]; - int res; + struct io_event ioev[8]; /* 256 octet stack space */ + int want = 1; /* how many events to request */ + int ringbuf_valid = linuxaio_ringbuf_valid (EV_A); + + if (expect_true (ringbuf_valid)) + { + /* if the ring buffer has any events, we don't wait or call the kernel at all */ + if (linuxaio_get_events_from_ring (EV_A)) + return; + + /* if the ring buffer is empty, and we don't have a timeout, then don't call the kernel */ + if (!timeout) + return; + } + else + /* no ringbuffer, request slightly larger batch */ + want = sizeof (ioev) / sizeof (ioev [0]); - if (linuxaio_get_events_from_ring (EV_A)) - return; + /* no events, so wait for some + * for fairness reasons, we do this in a loop, to fetch all events + */ + for (;;) + { + int res; - /* no events, so wait for at least one, then poll ring buffer again */ - /* this degrades to one event per loop iteration */ - /* if the ring buffer changes layout, but so be it */ + EV_RELEASE_CB; - EV_RELEASE_CB; + ts.tv_sec = (long)timeout; + ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); - ts.tv_sec = (long)timeout; - ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); + res = evsys_io_getevents (linuxaio_ctx, 1, want, ioev, &ts); - res = evsys_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts); + EV_ACQUIRE_CB; - EV_ACQUIRE_CB; + if (res < 0) + if (errno == EINTR) + /* ignored, retry */; + else + ev_syserr ("(libev) linuxaio io_getevents"); + else if (res) + { + /* at least one event available, handle them */ + linuxaio_parse_events (EV_A_ ioev, res); - if (res < 0) - if (errno == EINTR) - /* ignored */; - else - ev_syserr ("(libev) linuxaio io_getevents"); - else if (res) - { - /* at least one event received, handle it and any remaining ones in the ring buffer */ - linuxaio_parse_events (EV_A_ ioev, res); - linuxaio_get_events_from_ring (EV_A); + if (expect_true (ringbuf_valid)) + { + /* if we have a ring buffer, handle any remaining events in it */ + linuxaio_get_events_from_ring (EV_A); + + /* at this point, we should have handled all outstanding events */ + break; + } + else if (res < want) + /* otherwise, if there were fewere events than we wanted, we assume there are no more */ + break; + } + else + break; /* no events from the kernel, we are done */ + + timeout = 0; /* only wait in the first iteration */ } } -static int +inline_size +int linuxaio_io_setup (EV_P) { linuxaio_ctx = 0; @@ -457,10 +559,13 @@ } else if (errno == EBADF) { + assert (("libev: event loop rejected bad fd", errno != EBADF)); fd_kill (EV_A_ linuxaio_submits [submitted]->aio_fildes); res = 1; /* skip this iocb */ } + else if (errno == EINTR) /* not seen in reality, not documented */ + res = 0; /* silently ignore and retry */ else ev_syserr ("(libev) linuxaio io_submit"); @@ -519,7 +624,7 @@ { epoll_destroy (EV_A); linuxaio_free_iocbp (EV_A); - evsys_io_destroy (linuxaio_ctx); + evsys_io_destroy (linuxaio_ctx); /* fails in child, aio context is destroyed */ } inline_size @@ -535,6 +640,7 @@ while (linuxaio_io_setup (EV_A) < 0) ev_syserr ("(libev) linuxaio io_setup"); + /* forking epoll should also effectively unregister all fds from the backend */ epoll_fork (EV_A); ev_io_stop (EV_A_ &linuxaio_epoll_w);