summaryrefslogtreecommitdiff
path: root/util/fdmon-io_uring.c
blob: 1d14177df07fa58c8e3cfb18d608c29cf6dc3aa2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
/* SPDX-License-Identifier: GPL-2.0-or-later */
/*
 * Linux io_uring file descriptor monitoring
 *
 * The Linux io_uring API supports file descriptor monitoring with a few
 * advantages over existing APIs like poll(2) and epoll(7):
 *
 * 1. Userspace polling of events is possible because the completion queue (cq
 *    ring) is shared between the kernel and userspace.  This allows
 *    applications that rely on userspace polling to also monitor file
 *    descriptors in the same userspace polling loop.
 *
 * 2. Submission and completion is batched and done together in a single system
 *    call.  This minimizes the number of system calls.
 *
 * 3. File descriptor monitoring is O(1) like epoll(7) so it scales better than
 *    poll(2).
 *
 * 4. Nanosecond timeouts are supported so it requires fewer syscalls than
 *    epoll(7).
 *
 * This code only monitors file descriptors and does not do asynchronous disk
 * I/O.  Implementing disk I/O efficiently has other requirements and should
 * use a separate io_uring so it does not make sense to unify the code.
 *
 * File descriptor monitoring is implemented using the following operations:
 *
 * 1. IORING_OP_POLL_ADD - adds a file descriptor to be monitored.
 * 2. IORING_OP_POLL_REMOVE - removes a file descriptor being monitored.  When
 *    the poll mask changes for a file descriptor it is first removed and then
 *    re-added with the new poll mask, so this operation is also used as part
 *    of modifying an existing monitored file descriptor.
 * 3. IORING_OP_TIMEOUT - added every time a blocking syscall is made to wait
 *    for events.  This operation self-cancels if another event completes
 *    before the timeout.
 *
 * io_uring calls the submission queue the "sq ring" and the completion queue
 * the "cq ring".  Ring entries are called "sqe" and "cqe", respectively.
 *
 * The code is structured so that sq/cq rings are only modified within
 * fdmon_io_uring_wait().  Changes to AioHandlers are made by enqueuing them on
 * ctx->submit_list so that fdmon_io_uring_wait() can submit IORING_OP_POLL_ADD
 * and/or IORING_OP_POLL_REMOVE sqes for them.
 */

#include "qemu/osdep.h"
#include <poll.h>
#include "qemu/rcu_queue.h"
#include "aio-posix.h"

enum {
    FDMON_IO_URING_ENTRIES  = 128, /* sq/cq ring size */

    /* AioHandler::flags */
    FDMON_IO_URING_PENDING  = (1 << 0),
    FDMON_IO_URING_ADD      = (1 << 1),
    FDMON_IO_URING_REMOVE   = (1 << 2),
};

static inline int poll_events_from_pfd(int pfd_events)
{
    return (pfd_events & G_IO_IN ? POLLIN : 0) |
           (pfd_events & G_IO_OUT ? POLLOUT : 0) |
           (pfd_events & G_IO_HUP ? POLLHUP : 0) |
           (pfd_events & G_IO_ERR ? POLLERR : 0);
}

static inline int pfd_events_from_poll(int poll_events)
{
    return (poll_events & POLLIN ? G_IO_IN : 0) |
           (poll_events & POLLOUT ? G_IO_OUT : 0) |
           (poll_events & POLLHUP ? G_IO_HUP : 0) |
           (poll_events & POLLERR ? G_IO_ERR : 0);
}

/*
 * Returns an sqe for submitting a request.  Only be called within
 * fdmon_io_uring_wait().
 */
static struct io_uring_sqe *get_sqe(AioContext *ctx)
{
    struct io_uring *ring = &ctx->fdmon_io_uring;
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    int ret;

    if (likely(sqe)) {
        return sqe;
    }

    /* No free sqes left, submit pending sqes first */
    do {
        ret = io_uring_submit(ring);
    } while (ret == -EINTR);

    assert(ret > 1);
    sqe = io_uring_get_sqe(ring);
    assert(sqe);
    return sqe;
}

/* Atomically enqueue an AioHandler for sq ring submission */
static void enqueue(AioHandlerSList *head, AioHandler *node, unsigned flags)
{
    unsigned old_flags;

    old_flags = atomic_fetch_or(&node->flags, FDMON_IO_URING_PENDING | flags);
    if (!(old_flags & FDMON_IO_URING_PENDING)) {
        QSLIST_INSERT_HEAD_ATOMIC(head, node, node_submitted);
    }
}

/* Dequeue an AioHandler for sq ring submission.  Called by fill_sq_ring(). */
static AioHandler *dequeue(AioHandlerSList *head, unsigned *flags)
{
    AioHandler *node = QSLIST_FIRST(head);

    if (!node) {
        return NULL;
    }

    /* Doesn't need to be atomic since fill_sq_ring() moves the list */
    QSLIST_REMOVE_HEAD(head, node_submitted);

    /*
     * Don't clear FDMON_IO_URING_REMOVE.  It's sticky so it can serve two
     * purposes: telling fill_sq_ring() to submit IORING_OP_POLL_REMOVE and
     * telling process_cqe() to delete the AioHandler when its
     * IORING_OP_POLL_ADD completes.
     */
    *flags = atomic_fetch_and(&node->flags, ~(FDMON_IO_URING_PENDING |
                                              FDMON_IO_URING_ADD));
    return node;
}

static void fdmon_io_uring_update(AioContext *ctx,
                                  AioHandler *old_node,
                                  AioHandler *new_node)
{
    if (new_node) {
        enqueue(&ctx->submit_list, new_node, FDMON_IO_URING_ADD);
    }

    if (old_node) {
        /*
         * Deletion is tricky because IORING_OP_POLL_ADD and
         * IORING_OP_POLL_REMOVE are async.  We need to wait for the original
         * IORING_OP_POLL_ADD to complete before this handler can be freed
         * safely.
         *
         * It's possible that the file descriptor becomes ready and the
         * IORING_OP_POLL_ADD cqe is enqueued before IORING_OP_POLL_REMOVE is
         * submitted, too.
         *
         * Mark this handler deleted right now but don't place it on
         * ctx->deleted_aio_handlers yet.  Instead, manually fudge the list
         * entry to make QLIST_IS_INSERTED() think this handler has been
         * inserted and other code recognizes this AioHandler as deleted.
         *
         * Once the original IORING_OP_POLL_ADD completes we enqueue the
         * handler on the real ctx->deleted_aio_handlers list to be freed.
         */
        assert(!QLIST_IS_INSERTED(old_node, node_deleted));
        old_node->node_deleted.le_prev = &old_node->node_deleted.le_next;

        enqueue(&ctx->submit_list, old_node, FDMON_IO_URING_REMOVE);
    }
}

static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
{
    struct io_uring_sqe *sqe = get_sqe(ctx);
    int events = poll_events_from_pfd(node->pfd.events);

    io_uring_prep_poll_add(sqe, node->pfd.fd, events);
    io_uring_sqe_set_data(sqe, node);
}

static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
{
    struct io_uring_sqe *sqe = get_sqe(ctx);

    io_uring_prep_poll_remove(sqe, node);
}

/* Add a timeout that self-cancels when another cqe becomes ready */
static void add_timeout_sqe(AioContext *ctx, int64_t ns)
{
    struct io_uring_sqe *sqe;
    struct __kernel_timespec ts = {
        .tv_sec = ns / NANOSECONDS_PER_SECOND,
        .tv_nsec = ns % NANOSECONDS_PER_SECOND,
    };

    sqe = get_sqe(ctx);
    io_uring_prep_timeout(sqe, &ts, 1, 0);
}

/* Add sqes from ctx->submit_list for submission */
static void fill_sq_ring(AioContext *ctx)
{
    AioHandlerSList submit_list;
    AioHandler *node;
    unsigned flags;

    QSLIST_MOVE_ATOMIC(&submit_list, &ctx->submit_list);

    while ((node = dequeue(&submit_list, &flags))) {
        /* Order matters, just in case both flags were set */
        if (flags & FDMON_IO_URING_ADD) {
            add_poll_add_sqe(ctx, node);
        }
        if (flags & FDMON_IO_URING_REMOVE) {
            add_poll_remove_sqe(ctx, node);
        }
    }
}

/* Returns true if a handler became ready */
static bool process_cqe(AioContext *ctx,
                        AioHandlerList *ready_list,
                        struct io_uring_cqe *cqe)
{
    AioHandler *node = io_uring_cqe_get_data(cqe);
    unsigned flags;

    /* poll_timeout and poll_remove have a zero user_data field */
    if (!node) {
        return false;
    }

    /*
     * Deletion can only happen when IORING_OP_POLL_ADD completes.  If we race
     * with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
     * bit before IORING_OP_POLL_REMOVE is submitted.
     */
    flags = atomic_fetch_and(&node->flags, ~FDMON_IO_URING_REMOVE);
    if (flags & FDMON_IO_URING_REMOVE) {
        QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
        return false;
    }

    aio_add_ready_handler(ready_list, node, pfd_events_from_poll(cqe->res));

    /* IORING_OP_POLL_ADD is one-shot so we must re-arm it */
    add_poll_add_sqe(ctx, node);
    return true;
}

static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
{
    struct io_uring *ring = &ctx->fdmon_io_uring;
    struct io_uring_cqe *cqe;
    unsigned num_cqes = 0;
    unsigned num_ready = 0;
    unsigned head;

    io_uring_for_each_cqe(ring, head, cqe) {
        if (process_cqe(ctx, ready_list, cqe)) {
            num_ready++;
        }

        num_cqes++;
    }

    io_uring_cq_advance(ring, num_cqes);
    return num_ready;
}

static int fdmon_io_uring_wait(AioContext *ctx, AioHandlerList *ready_list,
                               int64_t timeout)
{
    unsigned wait_nr = 1; /* block until at least one cqe is ready */
    int ret;

    /* Fall back while external clients are disabled */
    if (atomic_read(&ctx->external_disable_cnt)) {
        return fdmon_poll_ops.wait(ctx, ready_list, timeout);
    }

    if (timeout == 0) {
        wait_nr = 0; /* non-blocking */
    } else if (timeout > 0) {
        add_timeout_sqe(ctx, timeout);
    }

    fill_sq_ring(ctx);

    do {
        ret = io_uring_submit_and_wait(&ctx->fdmon_io_uring, wait_nr);
    } while (ret == -EINTR);

    assert(ret >= 0);

    return process_cq_ring(ctx, ready_list);
}

static bool fdmon_io_uring_need_wait(AioContext *ctx)
{
    /* Have io_uring events completed? */
    if (io_uring_cq_ready(&ctx->fdmon_io_uring)) {
        return true;
    }

    /* Are there pending sqes to submit? */
    if (io_uring_sq_ready(&ctx->fdmon_io_uring)) {
        return true;
    }

    /* Do we need to process AioHandlers for io_uring changes? */
    if (!QSLIST_EMPTY_RCU(&ctx->submit_list)) {
        return true;
    }

    /* Are we falling back to fdmon-poll? */
    return atomic_read(&ctx->external_disable_cnt);
}

static const FDMonOps fdmon_io_uring_ops = {
    .update = fdmon_io_uring_update,
    .wait = fdmon_io_uring_wait,
    .need_wait = fdmon_io_uring_need_wait,
};

bool fdmon_io_uring_setup(AioContext *ctx)
{
    int ret;

    ret = io_uring_queue_init(FDMON_IO_URING_ENTRIES, &ctx->fdmon_io_uring, 0);
    if (ret != 0) {
        return false;
    }

    QSLIST_INIT(&ctx->submit_list);
    ctx->fdmon_ops = &fdmon_io_uring_ops;
    return true;
}

void fdmon_io_uring_destroy(AioContext *ctx)
{
    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
        AioHandler *node;

        io_uring_queue_exit(&ctx->fdmon_io_uring);

        /* Move handlers due to be removed onto the deleted list */
        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
            unsigned flags = atomic_fetch_and(&node->flags,
                    ~(FDMON_IO_URING_PENDING |
                      FDMON_IO_URING_ADD |
                      FDMON_IO_URING_REMOVE));

            if (flags & FDMON_IO_URING_REMOVE) {
                QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
            }

            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
        }

        ctx->fdmon_ops = &fdmon_poll_ops;
    }
}