ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libev/ev_linuxaio.c
(Generate patch)

Comparing libev/ev_linuxaio.c (file contents):
Revision 1.5 by root, Fri Jun 21 03:41:40 2019 UTC vs.
Revision 1.9 by root, Sat Jun 22 22:29:38 2019 UTC

39 39
40#include <sys/time.h> /* actually linux/time.h, but we must assume they are compatible */ 40#include <sys/time.h> /* actually linux/time.h, but we must assume they are compatible */
41#include <poll.h> 41#include <poll.h>
42#include <linux/aio_abi.h> 42#include <linux/aio_abi.h>
43 43
44/* we try to fill 4kn pages exactly. 44/* we try to fill 4kB pages exactly.
45 * the ring buffer header is 32 bytes, every io event is 32 bytes. 45 * the ring buffer header is 32 bytes, every io event is 32 bytes.
46 * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer 46 * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer.
47 * so the calculation below will use "exactly" 8kB for the ring buffer 47 * therefore the calculation below will use "exactly" 4kB for the ring buffer
48 */ 48 */
49#define EV_LINUXAIO_DEPTH (256 / 2 - 2 - 1) /* max. number of io events per batch */ 49#define EV_LINUXAIO_DEPTH (128 / 2 - 2 - 1) /* max. number of io events per batch */
50 50
51/*****************************************************************************/ 51/*****************************************************************************/
52/* syscall wrapdadoop */ 52/* syscall wrapdadoop */
53 53
54#include <sys/syscall.h> /* no glibc wrappers */ 54#include <sys/syscall.h> /* no glibc wrappers */
72 unsigned header_length; /* size of aio_ring */ 72 unsigned header_length; /* size of aio_ring */
73 73
74 struct io_event io_events[0]; 74 struct io_event io_events[0];
75}; 75};
76 76
77static int 77inline_size
78int
78ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp) 79ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp)
79{ 80{
80 return syscall (SYS_io_setup, nr_events, ctx_idp); 81 return syscall (SYS_io_setup, nr_events, ctx_idp);
81} 82}
82 83
83static int 84inline_size
85int
84ev_io_destroy (aio_context_t ctx_id) 86ev_io_destroy (aio_context_t ctx_id)
85{ 87{
86 return syscall (SYS_io_destroy, ctx_id); 88 return syscall (SYS_io_destroy, ctx_id);
87} 89}
88 90
89static int 91inline_size
92int
90ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[]) 93ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[])
91{ 94{
92 return syscall (SYS_io_submit, ctx_id, nr, cbp); 95 return syscall (SYS_io_submit, ctx_id, nr, cbp);
93} 96}
94 97
95static int 98inline_size
99int
96ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result) 100ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result)
97{ 101{
98 return syscall (SYS_io_cancel, ctx_id, cbp, result); 102 return syscall (SYS_io_cancel, ctx_id, cbp, result);
99} 103}
100 104
101static int 105inline_size
106int
102ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout) 107ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout)
103{ 108{
104 return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout); 109 return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout);
105} 110}
106
107typedef void (*ev_io_cb) (long nr, struct io_event *events);
108 111
109/*****************************************************************************/ 112/*****************************************************************************/
110/* actual backed implementation */ 113/* actual backed implementation */
111 114
112/* two iocbs for every fd, one for read, one for write */ 115/* we use out own wrapper structure in acse we ever want to do something "clever" */
113typedef struct aniocb 116typedef struct aniocb
114{ 117{
115 struct iocb io; 118 struct iocb io;
116 /*int inuse;*/ 119 /*int inuse;*/
117} *ANIOCBP; 120} *ANIOCBP;
118 121
119inline_size 122inline_size
120void 123void
121linuxaio_array_needsize_iocbp (ANIOCBP *base, int count) 124linuxaio_array_needsize_iocbp (ANIOCBP *base, int count)
122{ 125{
123 /* TODO: quite the overhead to allocate every iocb separately */ 126 /* TODO: quite the overhead to allocate every iocb separately, maybe use our own alocator? */
124 while (count--) 127 while (count--)
125 { 128 {
126 *base = (ANIOCBP)ev_malloc (sizeof (**base)); 129 *base = (ANIOCBP)ev_malloc (sizeof (**base));
127 /* TODO: full zero initialize required? */ 130 /* TODO: full zero initialize required? */
128 memset (*base, 0, sizeof (**base)); 131 memset (*base, 0, sizeof (**base));
129 /* would be nice to initialize fd/data as well */ 132 /* would be nice to initialize fd/data as well, but array_needsize API doesn't support that */
130 (*base)->io.aio_lio_opcode = IOCB_CMD_POLL; 133 (*base)->io.aio_lio_opcode = IOCB_CMD_POLL;
131 ++base; 134 ++base;
132 } 135 }
133} 136}
134 137
138ecb_cold
135static void 139static void
136linuxaio_free_iocbp (EV_P) 140linuxaio_free_iocbp (EV_P)
137{ 141{
138 while (linuxaio_iocbpmax--) 142 while (linuxaio_iocbpmax--)
139 ev_free (linuxaio_iocbps [linuxaio_iocbpmax]); 143 ev_free (linuxaio_iocbps [linuxaio_iocbpmax]);
140 144
141 /* next resize will completely reallocate the array */ 145 linuxaio_iocbpmax = 0; /* next resize will completely reallocate the array, at some overhead */
142 linuxaio_iocbpmax = 0;
143 linuxaio_submitcnt = 0; /* all pointers invalidated */
144} 146}
145 147
146static void 148static void
147linuxaio_modify (EV_P_ int fd, int oev, int nev) 149linuxaio_modify (EV_P_ int fd, int oev, int nev)
148{ 150{
203static int 205static int
204linuxaio_get_events_from_ring (EV_P) 206linuxaio_get_events_from_ring (EV_P)
205{ 207{
206 struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx; 208 struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx;
207 209
208 ECB_MEMORY_FENCE_ACQUIRE;
209
210 unsigned head = ring->head; 210 unsigned head = ring->head;
211 unsigned tail = *(volatile unsigned *)&ring->tail; 211 unsigned tail = *(volatile unsigned *)&ring->tail;
212 212
213 if (ring->magic != AIO_RING_MAGIC 213 if (head == tail)
214 || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES
215 || ring->header_length != sizeof (struct aio_ring) /* TODO: or use it to find io_event[0]? */
216 || head == tail)
217 return 0; 214 return 0;
215
216 /* bail out if the ring buffer doesn't match the expected layout */
217 if (ecb_expect_false (ring->magic != AIO_RING_MAGIC)
218 || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES
219 || ring->header_length != sizeof (struct aio_ring)) /* TODO: or use it to find io_event[0]? */
220 return 0;
221
222 ECB_MEMORY_FENCE_ACQUIRE;
218 223
219 /* parse all available events, but only once, to avoid starvation */ 224 /* parse all available events, but only once, to avoid starvation */
220 if (tail > head) /* normal case around */ 225 if (tail > head) /* normal case around */
221 linuxaio_parse_events (EV_A_ ring->io_events + head, tail - head); 226 linuxaio_parse_events (EV_A_ ring->io_events + head, tail - head);
222 else
223 {
224 /* wrapped around */ 227 else /* wrapped around */
228 {
225 linuxaio_parse_events (EV_A_ ring->io_events + head, ring->nr - head); 229 linuxaio_parse_events (EV_A_ ring->io_events + head, ring->nr - head);
226 linuxaio_parse_events (EV_A_ ring->io_events, tail); 230 linuxaio_parse_events (EV_A_ ring->io_events, tail);
227 } 231 }
228 232
229 ring->head = tail; 233 ring->head = tail;
242 246
243 if (linuxaio_get_events_from_ring (EV_A)) 247 if (linuxaio_get_events_from_ring (EV_A))
244 return; 248 return;
245 249
246 /* no events, so wait for at least one, then poll ring buffer again */ 250 /* no events, so wait for at least one, then poll ring buffer again */
247 /* this degraded to one event per loop iteration */ 251 /* this degrades to one event per loop iteration */
248 /* if the ring buffer changes layout, but so be it */ 252 /* if the ring buffer changes layout, but so be it */
249 253
250 ts.tv_sec = (long)timeout; 254 ts.tv_sec = (long)timeout;
251 ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); 255 ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9);
252 256
253 res = ev_io_getevents (linuxaio_ctx, 1, 1, &ioev, &ts); 257 res = ev_io_getevents (linuxaio_ctx, 1, 1, &ioev, &ts);
254 258
255 if (res < 0) 259 if (res < 0)
256 ev_syserr ("(libev) io_getevents"); 260 ev_syserr ("(libev) linuxaio io_getevents");
257 else if (res) 261 else if (res)
258 { 262 {
259 /* at least one event received, handle it and any remaining ones in the ring buffer */ 263 /* at least one event received, handle it and any remaining ones in the ring buffer */
260 linuxaio_parse_events (EV_A_ &ioev, 1); 264 linuxaio_parse_events (EV_A_ &ioev, 1);
261 linuxaio_get_events_from_ring (EV_A); 265 linuxaio_get_events_from_ring (EV_A);
274 /* which allows us to pinpoint the errornous iocb */ 278 /* which allows us to pinpoint the errornous iocb */
275 for (submitted = 0; submitted < linuxaio_submitcnt; ) 279 for (submitted = 0; submitted < linuxaio_submitcnt; )
276 { 280 {
277 int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); 281 int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted);
278 282
279 if (res < 0) 283 if (ecb_expect_false (res < 0))
280 if (errno == EAGAIN) 284 if (errno == EAGAIN)
281 { 285 {
282 /* This happens when the ring buffer is full, at least. I assume this means 286 /* This happens when the ring buffer is full, at least. I assume this means
283 * that the event was queued synchronously during io_submit, and thus 287 * that the event was queued synchronously during io_submit, and thus
284 * the buffer overflowd. 288 * the buffer overflowd.
285 * In this case, we just try next loop iteration. 289 * In this case, we just try next loop iteration.
290 * This should not result in a few fds taking priority, as the interface
291 * is one-shot, and we submit iocb's in a round-robin fashion.
286 */ 292 */
287 memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); 293 memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits));
288 linuxaio_submitcnt -= submitted; 294 linuxaio_submitcnt -= submitted;
289 timeout = 0; 295 timeout = 0;
290 break; 296 break;
291 } 297 }
292 else 298 else
293 /* TODO: we get EAGAIN when the ring buffer is full for some reason */
294 /* TODO: should we always just try next time? */
295 ev_syserr ("(libev) io_submit"); 299 ev_syserr ("(libev) linuxaio io_submit");
296 300
297 submitted += res; 301 submitted += res;
298 } 302 }
299 303
300 linuxaio_submitcnt = 0; 304 linuxaio_submitcnt = 0;
340 344
341inline_size 345inline_size
342void 346void
343linuxaio_fork (EV_P) 347linuxaio_fork (EV_P)
344{ 348{
345 /* TODO: verify and test */ 349 /* this frees all iocbs, which is very heavy-handed */
346 linuxaio_destroy (EV_A); 350 linuxaio_destroy (EV_A);
351 linuxaio_submitcnt = 0; /* all pointers were invalidated */
347 352
348 linuxaio_ctx = 0; 353 linuxaio_ctx = 0;
349 while (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0) 354 while (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0)
350 ev_syserr ("(libev) io_setup"); 355 ev_syserr ("(libev) linuxaio io_setup");
351 356
352 fd_rearm_all (EV_A); 357 fd_rearm_all (EV_A);
353} 358}
354 359

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines