--- libev/ev_linuxaio.c 2019/06/21 03:41:40 1.5 +++ libev/ev_linuxaio.c 2019/06/22 22:29:38 1.9 @@ -41,12 +41,12 @@ #include #include -/* we try to fill 4kn pages exactly. +/* we try to fill 4kB pages exactly. * the ring buffer header is 32 bytes, every io event is 32 bytes. - * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer - * so the calculation below will use "exactly" 8kB for the ring buffer + * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer. + * therefore the calculation below will use "exactly" 4kB for the ring buffer */ -#define EV_LINUXAIO_DEPTH (256 / 2 - 2 - 1) /* max. number of io events per batch */ +#define EV_LINUXAIO_DEPTH (128 / 2 - 2 - 1) /* max. number of io events per batch */ /*****************************************************************************/ /* syscall wrapdadoop */ @@ -74,42 +74,45 @@ struct io_event io_events[0]; }; -static int +inline_size +int ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp) { return syscall (SYS_io_setup, nr_events, ctx_idp); } -static int +inline_size +int ev_io_destroy (aio_context_t ctx_id) { return syscall (SYS_io_destroy, ctx_id); } -static int +inline_size +int ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) { return syscall (SYS_io_submit, ctx_id, nr, cbp); } -static int +inline_size +int ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) { return syscall (SYS_io_cancel, ctx_id, cbp, result); } -static int +inline_size +int ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) { return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); } -typedef void (*ev_io_cb) (long nr, struct io_event *events); - /*****************************************************************************/ /* actual backed implementation */ -/* two iocbs for every fd, one for read, one for write */ +/* we use out own wrapper structure in acse we ever want to do something "clever" */ typedef struct aniocb { struct iocb io; @@ -120,27 +123,26 @@ void linuxaio_array_needsize_iocbp (ANIOCBP *base, int count) { - /* TODO: quite the overhead to allocate every iocb separately */ + /* TODO: quite the overhead to allocate every iocb separately, maybe use our own alocator? */ while (count--) { *base = (ANIOCBP)ev_malloc (sizeof (**base)); /* TODO: full zero initialize required? */ memset (*base, 0, sizeof (**base)); - /* would be nice to initialize fd/data as well */ + /* would be nice to initialize fd/data as well, but array_needsize API doesn't support that */ (*base)->io.aio_lio_opcode = IOCB_CMD_POLL; ++base; } } +ecb_cold static void linuxaio_free_iocbp (EV_P) { while (linuxaio_iocbpmax--) ev_free (linuxaio_iocbps [linuxaio_iocbpmax]); - /* next resize will completely reallocate the array */ - linuxaio_iocbpmax = 0; - linuxaio_submitcnt = 0; /* all pointers invalidated */ + linuxaio_iocbpmax = 0; /* next resize will completely reallocate the array, at some overhead */ } static void @@ -205,23 +207,25 @@ { struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx; - ECB_MEMORY_FENCE_ACQUIRE; - unsigned head = ring->head; unsigned tail = *(volatile unsigned *)&ring->tail; - if (ring->magic != AIO_RING_MAGIC - || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES - || ring->header_length != sizeof (struct aio_ring) /* TODO: or use it to find io_event[0]? */ - || head == tail) + if (head == tail) + return 0; + + /* bail out if the ring buffer doesn't match the expected layout */ + if (ecb_expect_false (ring->magic != AIO_RING_MAGIC) + || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES + || ring->header_length != sizeof (struct aio_ring)) /* TODO: or use it to find io_event[0]? */ return 0; + ECB_MEMORY_FENCE_ACQUIRE; + /* parse all available events, but only once, to avoid starvation */ if (tail > head) /* normal case around */ linuxaio_parse_events (EV_A_ ring->io_events + head, tail - head); - else + else /* wrapped around */ { - /* wrapped around */ linuxaio_parse_events (EV_A_ ring->io_events + head, ring->nr - head); linuxaio_parse_events (EV_A_ ring->io_events, tail); } @@ -244,7 +248,7 @@ return; /* no events, so wait for at least one, then poll ring buffer again */ - /* this degraded to one event per loop iteration */ + /* this degrades to one event per loop iteration */ /* if the ring buffer changes layout, but so be it */ ts.tv_sec = (long)timeout; @@ -253,7 +257,7 @@ res = ev_io_getevents (linuxaio_ctx, 1, 1, &ioev, &ts); if (res < 0) - ev_syserr ("(libev) io_getevents"); + ev_syserr ("(libev) linuxaio io_getevents"); else if (res) { /* at least one event received, handle it and any remaining ones in the ring buffer */ @@ -276,13 +280,15 @@ { int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); - if (res < 0) + if (ecb_expect_false (res < 0)) if (errno == EAGAIN) { /* This happens when the ring buffer is full, at least. I assume this means * that the event was queued synchronously during io_submit, and thus * the buffer overflowd. * In this case, we just try next loop iteration. + * This should not result in a few fds taking priority, as the interface + * is one-shot, and we submit iocb's in a round-robin fashion. */ memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); linuxaio_submitcnt -= submitted; @@ -290,9 +296,7 @@ break; } else - /* TODO: we get EAGAIN when the ring buffer is full for some reason */ - /* TODO: should we always just try next time? */ - ev_syserr ("(libev) io_submit"); + ev_syserr ("(libev) linuxaio io_submit"); submitted += res; } @@ -342,12 +346,13 @@ void linuxaio_fork (EV_P) { - /* TODO: verify and test */ + /* this frees all iocbs, which is very heavy-handed */ linuxaio_destroy (EV_A); + linuxaio_submitcnt = 0; /* all pointers were invalidated */ linuxaio_ctx = 0; while (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) - ev_syserr ("(libev) io_setup"); + ev_syserr ("(libev) linuxaio io_setup"); fd_rearm_all (EV_A); }