ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libev/ev_linuxaio.c
(Generate patch)

Comparing libev/ev_linuxaio.c (file contents):
Revision 1.18 by root, Mon Jun 24 04:54:08 2019 UTC vs.
Revision 1.19 by root, Mon Jun 24 19:53:47 2019 UTC

182 array_needsize (struct iocb *, linuxaio_submits, linuxaio_submitmax, linuxaio_submitcnt, array_needsize_noinit); 182 array_needsize (struct iocb *, linuxaio_submits, linuxaio_submitmax, linuxaio_submitcnt, array_needsize_noinit);
183 linuxaio_submits [linuxaio_submitcnt - 1] = &iocb->io; 183 linuxaio_submits [linuxaio_submitcnt - 1] = &iocb->io;
184 } 184 }
185} 185}
186 186
187#if EPOLL_FALLBACK
188
189static void
190linuxaio_rearm_epoll (EV_P_ struct iocb *iocb, int op)
191{
192 struct epoll_event eev;
193
194 eev.events = EPOLLONESHOT;
195 if (iocb->aio_buf & POLLIN ) eev.events |= EPOLLIN ;
196 if (iocb->aio_buf & POLLOUT) eev.events |= EPOLLOUT;
197 eev.data.fd = iocb->aio_fildes;
198
199 if (epoll_ctl (backend_fd, op, iocb->aio_fildes, &eev) < 0)
200 ev_syserr ("(libeio) linuxaio epoll_ctl");
201}
202
203static void
204linuxaio_epoll_cb (EV_P_ struct ev_io *w, int revents)
205{
206 struct epoll_event events[16];
207
208 for (;;)
209 {
210 int idx;
211 int res = epoll_wait (backend_fd, events, sizeof (events) / sizeof (events [0]), 0);
212
213 if (expect_false (res < 0))
214 ev_syserr ("(libev) linuxaio epoll_wait");
215 else if (!res)
216 break;
217
218 for (idx = res; idx--; )
219 {
220 int fd = events [idx].data.fd;
221 uint32_t ev = events [idx].events;
222
223 assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax));
224
225 linuxaio_rearm_epoll (EV_A_ &linuxaio_iocbps [fd]->io, EPOLL_CTL_MOD);
226
227 fd_event (EV_A_ fd,
228 (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
229 | (ev & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0));
230 }
231
232 if (res < sizeof (events) / sizeof (events [0]))
233 break;
234 }
235}
236
237#endif
238
187static void 239static void
188linuxaio_parse_events (EV_P_ struct io_event *ev, int nr) 240linuxaio_parse_events (EV_P_ struct io_event *ev, int nr)
189{ 241{
190 while (nr) 242 while (nr)
191 { 243 {
192 int fd = ev->data; 244 int fd = ev->data;
193 int res = ev->res; 245 int res = ev->res;
194 246
195 assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax)); 247 assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax));
196 248
197 /* linux aio is oneshot: rearm fd */ 249 /* linux aio is oneshot: rearm fd. TODO: this does more work than needed */
198 linuxaio_iocbps [fd]->io.aio_buf = 0; 250 linuxaio_iocbps [fd]->io.aio_buf = 0;
199 anfds [fd].events = 0; 251 anfds [fd].events = 0;
200 fd_change (EV_A_ fd, 0); 252 fd_change (EV_A_ fd, 0);
201 253
202 /* feed events, we do not expect or handle POLLNVAL */ 254 /* feed events, we do not expect or handle POLLNVAL */
272 324
273 /* no events, so wait for at least one, then poll ring buffer again */ 325 /* no events, so wait for at least one, then poll ring buffer again */
274 /* this degrades to one event per loop iteration */ 326 /* this degrades to one event per loop iteration */
275 /* if the ring buffer changes layout, but so be it */ 327 /* if the ring buffer changes layout, but so be it */
276 328
329 EV_RELEASE_CB;
330
277 ts.tv_sec = (long)timeout; 331 ts.tv_sec = (long)timeout;
278 ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9); 332 ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9);
279 333
280 res = ev_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts); 334 res = ev_io_getevents (linuxaio_ctx, 1, sizeof (ioev) / sizeof (ioev [0]), ioev, &ts);
335
336 EV_ACQUIRE_CB;
281 337
282 if (res < 0) 338 if (res < 0)
283 if (errno == EINTR) 339 if (errno == EINTR)
284 /* ignored */; 340 /* ignored */;
285 else 341 else
290 linuxaio_parse_events (EV_A_ ioev, res); 346 linuxaio_parse_events (EV_A_ ioev, res);
291 linuxaio_get_events_from_ring (EV_A); 347 linuxaio_get_events_from_ring (EV_A);
292 } 348 }
293} 349}
294 350
295#if EPOLL_FALLBACK
296static void
297linuxaio_rearm_epoll (EV_P_ struct iocb *iocb, int op)
298{
299 struct epoll_event eev;
300
301 eev.events = EPOLLONESHOT;
302 if (iocb->aio_buf & POLLIN ) eev.events |= EPOLLIN ;
303 if (iocb->aio_buf & POLLOUT) eev.events |= EPOLLOUT;
304 eev.data.fd = iocb->aio_fildes;
305
306 if (epoll_ctl (backend_fd, op, iocb->aio_fildes, &eev) < 0)
307 ev_syserr ("(libeio) linuxaio epoll_ctl");
308}
309#endif
310
311static void 351static void
312linuxaio_poll (EV_P_ ev_tstamp timeout) 352linuxaio_poll (EV_P_ ev_tstamp timeout)
313{ 353{
314 int submitted; 354 int submitted;
315 355
318 /* io_submit might return less than the requested number of iocbs */ 358 /* io_submit might return less than the requested number of iocbs */
319 /* this is, afaics, only because of errors, but we go by the book and use a loop, */ 359 /* this is, afaics, only because of errors, but we go by the book and use a loop, */
320 /* which allows us to pinpoint the errornous iocb */ 360 /* which allows us to pinpoint the errornous iocb */
321 for (submitted = 0; submitted < linuxaio_submitcnt; ) 361 for (submitted = 0; submitted < linuxaio_submitcnt; )
322 { 362 {
363#if 0
364 int res;
365 if (linuxaio_submits[submitted]->aio_fildes == backend_fd)
366 res = ev_io_submit (linuxaio_ctx, 1, linuxaio_submits + submitted);
367 else
368 { res = -1; errno = EINVAL; };
369#else
323 int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted); 370 int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted);
371#endif
324 372
325 if (expect_false (res < 0)) 373 if (expect_false (res < 0))
326 if (errno == EAGAIN) 374 if (errno == EAGAIN)
327 { 375 {
328 /* This happens when the ring buffer is full, at least. I assume this means 376 /* This happens when the ring buffer is full, at least. I assume this means
329 * that the event was queued synchronously during io_submit, and thus 377 * that the event was queued synchronously during io_submit, and thus
330 * the buffer overflowed. 378 * the buffer overflowed.
331 * In this case, we just try in next loop iteration. 379 * In this case, we just try in next loop iteration.
332 * This should not result in a few fds taking priority, as the interface 380 * This should not result in a few fds taking priority, as the interface
333 * is one-shot, and we submit iocb's in a round-robin fashion. 381 * is one-shot, and we submit iocb's in a round-robin fashion.
382 * TODO: maybe make "submitted" persistent, so we don't have to memmove?
334 */ 383 */
384 if (ecb_expect_false (submitted))
385 {
335 memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits)); 386 memmove (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits));
336 linuxaio_submitcnt -= submitted; 387 linuxaio_submitcnt -= submitted;
388 }
389
337 timeout = 0; 390 timeout = 0;
338 break; 391 break;
339 } 392 }
340#if EPOLL_FALLBACK 393#if EPOLL_FALLBACK
341 else if (errno == EINVAL) 394 else if (errno == EINVAL)
342 { 395 {
343 /* This happens for unsupported fds, officially, but in my testing, 396 /* This happens for unsupported fds, officially, but in my testing,
344 * also randomly happens for supported fds. We fall back to good old 397 * also randomly happens for supported fds. We fall back to good old
345 * poll() here, under the assumption that this is a very rare case. 398 * poll() here, under the assumption that this is a very rare case.
346 * See https://lore.kernel.org/patchwork/patch/1047453/ for evidence 399 * See https://lore.kernel.org/patchwork/patch/1047453/ to see
347 * that the problem is known, but ignored. 400 * discussion about such a case (ttys) where polling for POLLIN
401 * fails but POLLIN|POLLOUT works.
348 */ 402 */
349 struct iocb *iocb = linuxaio_submits [submitted]; 403 struct iocb *iocb = linuxaio_submits [submitted];
350 res = 1; /* skip this iocb */ 404 res = 1; /* skip this iocb */
351 405
352 linuxaio_rearm_epoll (EV_A_ iocb, EPOLL_CTL_ADD); 406 linuxaio_rearm_epoll (EV_A_ iocb, EPOLL_CTL_ADD);
363 417
364 /* second phase: fetch and parse events */ 418 /* second phase: fetch and parse events */
365 419
366 linuxaio_get_events (EV_A_ timeout); 420 linuxaio_get_events (EV_A_ timeout);
367} 421}
368
369#if EPOLL_FALLBACK
370
371static void
372linuxaio_epoll_cb (EV_P_ struct ev_io *w, int revents)
373{
374 struct epoll_event events[16];
375
376 for (;;)
377 {
378 int idx;
379 int res = epoll_wait (backend_fd, events, sizeof (events) / sizeof (events [0]), 0);
380
381 if (expect_false (res < 0))
382 ev_syserr ("(libev) linuxaio epoll_wait");
383 else if (!res)
384 break;
385
386 for (idx = res; idx--; )
387 {
388 int fd = events [idx].data.fd;
389 uint32_t ev = events [idx].events;
390
391 assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdmax));
392
393 linuxaio_rearm_epoll (EV_A_ &linuxaio_iocbps [fd]->io, EPOLL_CTL_MOD);
394
395 fd_event (EV_A_ fd,
396 (ev & (EPOLLOUT | EPOLLERR | EPOLLHUP) ? EV_WRITE : 0)
397 | (ev & (EPOLLIN | EPOLLERR | EPOLLHUP) ? EV_READ : 0));
398 }
399
400 if (res < sizeof (events) / sizeof (events [0]))
401 break;
402 }
403}
404
405#endif
406 422
407inline_size 423inline_size
408int 424int
409linuxaio_init (EV_P_ int flags) 425linuxaio_init (EV_P_ int flags)
410{ 426{
431 ev_io_destroy (linuxaio_ctx); 447 ev_io_destroy (linuxaio_ctx);
432 return 0; 448 return 0;
433 } 449 }
434 450
435 ev_io_init (EV_A_ &linuxaio_epoll_w, linuxaio_epoll_cb, backend_fd, EV_READ); 451 ev_io_init (EV_A_ &linuxaio_epoll_w, linuxaio_epoll_cb, backend_fd, EV_READ);
452 ev_set_priority (&linuxaio_epoll_w, EV_MAXPRI);
436 ev_io_start (EV_A_ &linuxaio_epoll_w); 453 ev_io_start (EV_A_ &linuxaio_epoll_w);
437 ev_unref (EV_A); /* watcher should not keep loop alive */ 454 ev_unref (EV_A); /* watcher should not keep loop alive */
438#endif 455#endif
439 456
440 backend_modify = linuxaio_modify; 457 backend_modify = linuxaio_modify;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines