ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/libev/ev_linuxaio.c
Revision: 1.1
Committed: Thu Jun 20 22:44:59 2019 UTC (4 years, 10 months ago) by root
Content type: text/plain
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 /*
2     * libev linux aio fd activity backend
3     *
4     * Copyright (c) 2019 Marc Alexander Lehmann <libev@schmorp.de>
5     * All rights reserved.
6     *
7     * Redistribution and use in source and binary forms, with or without modifica-
8     * tion, are permitted provided that the following conditions are met:
9     *
10     * 1. Redistributions of source code must retain the above copyright notice,
11     * this list of conditions and the following disclaimer.
12     *
13     * 2. Redistributions in binary form must reproduce the above copyright
14     * notice, this list of conditions and the following disclaimer in the
15     * documentation and/or other materials provided with the distribution.
16     *
17     * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
18     * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MER-
19     * CHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
20     * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPE-
21     * CIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
22     * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
23     * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
24     * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTH-
25     * ERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
26     * OF THE POSSIBILITY OF SUCH DAMAGE.
27     *
28     * Alternatively, the contents of this file may be used under the terms of
29     * the GNU General Public License ("GPL") version 2 or any later version,
30     * in which case the provisions of the GPL are applicable instead of
31     * the above. If you wish to allow the use of your version of this file
32     * only under the terms of the GPL and not to allow others to use your
33     * version of this file under the BSD license, indicate your decision
34     * by deleting the provisions above and replace them with the notice
35     * and other provisions required by the GPL. If you do not delete the
36     * provisions above, a recipient may use your version of this file under
37     * either the BSD or the GPL.
38     */
39    
40     #include <sys/time.h> /* actually linux/time.h, but we must assume they are compatible */
41     #include <linux/aio_abi.h>
42    
43     /* we try to fill 4kn pages exactly.
44     * the ring buffer header is 32 bytes, every io event is 32 bytes.
45     * the kernel takes the io event number, doubles it, adds 2, adds the ring buffer
46     * so the calculation below will use "exactly" 8kB for the ring buffer
47     */
48     #define EV_LINUXAIO_DEPTH (256 / 2 - 2 - 1) /* max. number of io events per batch */
49    
50     /*****************************************************************************/
51     /* syscall wrapdadoop */
52    
53     #include <sys/syscall.h> /* no glibc wrappers */
54    
55     /* aio_abi.h is not verioned in any way, so we cannot test for its existance */
56     #define IOCB_CMD_POLL 5
57    
58     /* taken from linux/fs/aio.c */
59     #define AIO_RING_MAGIC 0xa10a10a1
60     #define AIO_RING_INCOMPAT_FEATURES 0
61     struct aio_ring
62     {
63     unsigned id; /* kernel internal index number */
64     unsigned nr; /* number of io_events */
65     unsigned head; /* Written to by userland or by kernel. */
66     unsigned tail;
67    
68     unsigned magic;
69     unsigned compat_features;
70     unsigned incompat_features;
71     unsigned header_length; /* size of aio_ring */
72    
73     struct io_event io_events[0];
74     };
75    
76     static int
77     ev_io_setup (unsigned nr_events, aio_context_t *ctx_idp)
78     {
79     return syscall (SYS_io_setup, nr_events, ctx_idp);
80     }
81    
82     static int
83     ev_io_destroy (aio_context_t ctx_id)
84     {
85     return syscall (SYS_io_destroy, ctx_id);
86     }
87    
88     static int
89     ev_io_submit (aio_context_t ctx_id, long nr, struct iocb *cbp[])
90     {
91     return syscall (SYS_io_submit, ctx_id, nr, cbp);
92     }
93    
94     static int
95     ev_io_cancel (aio_context_t ctx_id, struct iocb *cbp, struct io_event *result)
96     {
97     return syscall (SYS_io_cancel, ctx_id, cbp, result);
98     }
99    
100     static int
101     ev_io_getevents (aio_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout)
102     {
103     return syscall (SYS_io_getevents, ctx_id, min_nr, nr, events, timeout);
104     }
105    
106     typedef void (*ev_io_cb) (long nr, struct io_event *events);
107    
108     /*****************************************************************************/
109     /* actual backed implementation */
110    
111     /* two iocbs for every fd, one for read, one for write */
112     typedef struct aniocb
113     {
114     struct iocb io;
115     /*int inuse;*/
116     } *ANIOCBP;
117    
118     inline_size
119     void
120     linuxaio_array_needsize_iocbp (ANIOCBP *base, int count)
121     {
122     /* TODO: quite the overhead to allocate every iocb separately */
123     while (count--)
124     {
125     *base = (ANIOCBP)ev_malloc (sizeof (**base));
126     memset (*base, 0, sizeof (**base));
127     /* would be nice to initialize fd/data as well */
128     (*base)->io.aio_lio_opcode = IOCB_CMD_POLL;
129     ++base;
130     }
131     }
132    
133     static void
134     linuxaio_free_iocbp (EV_P)
135     {
136     while (linuxaio_iocbpmax--)
137     ev_free (linuxaio_iocbps [linuxaio_iocbpmax]);
138    
139     linuxaio_iocbpmax = 0;
140     }
141    
142     static void
143     linuxaio_modify (EV_P_ int fd, int oev, int nev)
144     {
145     /* TODO: full zero initialize required? */
146     array_needsize (ANIOCBP, linuxaio_iocbps, linuxaio_iocbpmax, fd + 1, linuxaio_array_needsize_iocbp);
147     struct aniocb *iocb = linuxaio_iocbps [fd];
148    
149     if (iocb->io.aio_buf)
150     ev_io_cancel (linuxaio_ctx, &iocb->io, (void *)0);
151    
152     if (nev)
153     {
154     iocb->io.aio_data = fd;
155     iocb->io.aio_fildes = fd;
156     iocb->io.aio_buf =
157     (nev & EV_READ ? POLLIN : 0)
158     | (nev & EV_WRITE ? POLLOUT : 0);
159    
160     /* queue iocb up for io_submit */
161     /* this assumes we only ever get one call per fd per loop iteration */
162     ++linuxaio_submitcnt;
163     array_needsize (struct iocb *, linuxaio_submits, linuxaio_submitmax, linuxaio_submitcnt, array_needsize_noinit);
164     linuxaio_submits [linuxaio_submitcnt - 1] = &iocb->io;
165     }
166     }
167    
168     static void
169     linuxaio_parse_events (EV_P_ struct io_event *ev, int nr)
170     {
171     while (nr)
172     {
173     int fd = ev->data;
174     int res = ev->res;
175    
176     assert (("libev: iocb fd must be in-bounds", fd >= 0 && fd < anfdxmax));
177    
178     /* linux aio is oneshot: rearm fd */
179     linuxaio_iocbps [fd]->io.aio_buf = 0;
180     anfds [fd].events = 0;
181     fd_change (EV_A_ fd, 0);
182    
183     /* feed events, we do not expect or handle POLLNVAL */
184     if (ecb_expect_false (res & POLLNVAL))
185     fd_kill (EV_A_ fd);
186     else
187     fd_event (
188     EV_A_
189     fd,
190     (res & (POLLOUT | POLLERR | POLLHUP) ? EV_WRITE : 0)
191     | (res & (POLLIN | POLLERR | POLLHUP) ? EV_READ : 0)
192     );
193    
194     --nr;
195     ++ev;
196     }
197     }
198    
199     /* get any events from ringbuffer, return true if any were handled */
200     static int
201     linuxaio_get_events_from_ring (EV_P)
202     {
203     struct aio_ring *ring = (struct aio_ring *)linuxaio_ctx;
204    
205     ECB_MEMORY_FENCE_ACQUIRE;
206    
207     unsigned head = ring->head;
208     unsigned tail = *(volatile unsigned *)&ring->tail;
209    
210     if (ring->magic != AIO_RING_MAGIC
211     || ring->incompat_features != AIO_RING_INCOMPAT_FEATURES
212     || ring->header_length != sizeof (struct aio_ring) /* TODO: or use it to find io_event[0]? */
213     || head == tail)
214     return 0;
215    
216     /* parse all available events, but only once, to avoid starvation */
217     if (tail > head) /* normal case around */
218     linuxaio_parse_events (EV_A_ ring->io_events + head, tail - head);
219     else
220     {
221     /* wrapped around */
222     linuxaio_parse_events (EV_A_ ring->io_events + head, ring->nr - head);
223     linuxaio_parse_events (EV_A_ ring->io_events, tail);
224     }
225    
226     ring->head = tail;
227    
228     return 1;
229     }
230    
231     /* read at least one event from kernel, or timeout */
232     inline_size
233     void
234     linuxaio_get_events (EV_P_ ev_tstamp timeout)
235     {
236     struct timespec ts;
237     struct io_event ioev;
238     int res;
239    
240     if (linuxaio_get_events_from_ring (EV_A))
241     return;
242    
243     /* no events, so wait for at least one, then poll ring buffer again */
244     /* this degraded to one event per loop iteration */
245     /* if the ring buffer changes layout, but so be it */
246    
247     ts.tv_sec = (long)timeout;
248     ts.tv_nsec = (long)((timeout - ts.tv_sec) * 1e9);
249    
250     res = ev_io_getevents (linuxaio_ctx, 1, 1, &ioev, &ts);
251    
252     if (res < 0)
253     ev_syserr ("(libev) io_getevents");
254     else if (res)
255     {
256     /* at least one event received, handle it and any remaining ones in the ring buffer */
257     linuxaio_parse_events (EV_A_ &ioev, 1);
258     linuxaio_get_events_from_ring (EV_A);
259     }
260     }
261    
262     static void
263     linuxaio_poll (EV_P_ ev_tstamp timeout)
264     {
265     int submitted;
266    
267     /* first phase: submit new iocbs */
268    
269     /* io_submit might return less than the requested number of iocbs */
270     /* this is, afaics, only because of errors, but we go by the book and use a loop, */
271     /* which allows us to pinpoint the errornous iocb */
272     for (submitted = 0; submitted < linuxaio_submitcnt; )
273     {
274     int res = ev_io_submit (linuxaio_ctx, linuxaio_submitcnt - submitted, linuxaio_submits + submitted);
275    
276     if (res < 0)
277     if (errno == EAGAIN)
278     {
279     /* This happens when the ring buffer is full, at least. I assume this means
280     * that the event was queued synchronously during io_submit, and thus
281     * the buffer overflowd.
282     * In this case, we just try next loop iteration.
283     */
284     memcpy (linuxaio_submits, linuxaio_submits + submitted, (linuxaio_submitcnt - submitted) * sizeof (*linuxaio_submits));
285     linuxaio_submitcnt -= submitted;
286     timeout = 0;
287     break;
288     }
289     else
290     /* TODO: we get EAGAIN when the ring buffer is full for some reason */
291     /* TODO: should we always just try next time? */
292     ev_syserr ("(libev) io_submit");
293    
294     submitted += res;
295     }
296    
297     linuxaio_submitcnt = 0;
298    
299     /* second phase: fetch and parse events */
300    
301     linuxaio_get_events (EV_A_ timeout);
302     }
303    
304     inline_size
305     int
306     linuxaio_init (EV_P_ int flags)
307     {
308     /* would be great to have a nice test for IOCB_CMD_POLL instead */
309     if (ev_linux_version () < 0x041200) /* 4.18 introduced IOCB_CMD_POLL */
310     return 0;
311    
312     if (ev_io_setup (EV_LINUXAIO_DEPTH, &linuxaio_ctx) < 0)
313     return 0;
314    
315     backend_modify = linuxaio_modify;
316     backend_poll = linuxaio_poll;
317    
318     linuxaio_iocbpmax = 0;
319     linuxaio_iocbps = 0;
320    
321     linuxaio_submits = 0;
322     linuxaio_submitmax = 0;
323     linuxaio_submitcnt = 0;
324    
325     return EVBACKEND_LINUXAIO;
326     }
327    
328     inline_size
329     void
330     linuxaio_destroy (EV_P)
331     {
332     linuxaio_free_iocbp (EV_A);
333     ev_io_destroy (linuxaio_ctx);
334     }
335    
336     inline_size
337     void
338     linuxaio_fork (EV_P)
339     {
340     abort ();//D
341     }
342