diff options
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r-- | io_uring/io_uring.c | 418 |
1 files changed, 271 insertions, 147 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 8840cf3e20f2..58ac13b69dc8 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req); static void io_queue_sqe(struct io_kiocb *req); static void io_move_task_work_from_local(struct io_ring_ctx *ctx); static void __io_submit_flush_completions(struct io_ring_ctx *ctx); +static __cold void io_fallback_tw(struct io_uring_task *tctx); static struct kmem_cache *req_cachep; @@ -167,7 +168,8 @@ EXPORT_SYMBOL(io_uring_get_socket); static inline void io_submit_flush_completions(struct io_ring_ctx *ctx) { - if (!wq_list_empty(&ctx->submit_state.compl_reqs)) + if (!wq_list_empty(&ctx->submit_state.compl_reqs) || + ctx->submit_state.cqes_count) __io_submit_flush_completions(ctx); } @@ -325,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) spin_lock_init(&ctx->rsrc_ref_lock); INIT_LIST_HEAD(&ctx->rsrc_ref_list); INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work); + init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw); init_llist_head(&ctx->rsrc_put_llist); init_llist_head(&ctx->work_llist); INIT_LIST_HEAD(&ctx->tctx_list); @@ -495,7 +498,7 @@ static void io_eventfd_ops(struct rcu_head *rcu) int ops = atomic_xchg(&ev_fd->ops, 0); if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT)) - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); /* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback * ordering in a race but if references are 0 we know we have to free @@ -531,11 +534,11 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx) goto out; if (likely(eventfd_signal_allowed())) { - eventfd_signal(ev_fd->cq_ev_fd, 1); + eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE); } else { atomic_inc(&ev_fd->refs); if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops)) - call_rcu(&ev_fd->rcu, io_eventfd_ops); + call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops); else atomic_dec(&ev_fd->refs); } @@ -569,45 +572,86 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx) void __io_commit_cqring_flush(struct io_ring_ctx *ctx) { - if (ctx->off_timeout_used || ctx->drain_active) { + if (ctx->off_timeout_used) + io_flush_timeouts(ctx); + if (ctx->drain_active) { spin_lock(&ctx->completion_lock); - if (ctx->off_timeout_used) - io_flush_timeouts(ctx); - if (ctx->drain_active) - io_queue_deferred(ctx); + io_queue_deferred(ctx); spin_unlock(&ctx->completion_lock); } if (ctx->has_evfd) io_eventfd_flush_signal(ctx); } -static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx) +static inline void __io_cq_lock(struct io_ring_ctx *ctx) + __acquires(ctx->completion_lock) +{ + if (!ctx->task_complete) + spin_lock(&ctx->completion_lock); +} + +static inline void __io_cq_unlock(struct io_ring_ctx *ctx) +{ + if (!ctx->task_complete) + spin_unlock(&ctx->completion_lock); +} + +static inline void io_cq_lock(struct io_ring_ctx *ctx) + __acquires(ctx->completion_lock) +{ + spin_lock(&ctx->completion_lock); +} + +static inline void io_cq_unlock(struct io_ring_ctx *ctx) + __releases(ctx->completion_lock) { + spin_unlock(&ctx->completion_lock); +} + +/* keep it inlined for io_submit_flush_completions() */ +static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) + __releases(ctx->completion_lock) +{ + io_commit_cqring(ctx); + __io_cq_unlock(ctx); io_commit_cqring_flush(ctx); io_cqring_wake(ctx); } -static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx) +void io_cq_unlock_post(struct io_ring_ctx *ctx) __releases(ctx->completion_lock) { io_commit_cqring(ctx); spin_unlock(&ctx->completion_lock); - io_cqring_ev_posted(ctx); + io_commit_cqring_flush(ctx); + io_cqring_wake(ctx); } -void io_cq_unlock_post(struct io_ring_ctx *ctx) +/* Returns true if there are no backlogged entries after the flush */ +static void io_cqring_overflow_kill(struct io_ring_ctx *ctx) { - __io_cq_unlock_post(ctx); + struct io_overflow_cqe *ocqe; + LIST_HEAD(list); + + io_cq_lock(ctx); + list_splice_init(&ctx->cq_overflow_list, &list); + clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); + io_cq_unlock(ctx); + + while (!list_empty(&list)) { + ocqe = list_first_entry(&list, struct io_overflow_cqe, list); + list_del(&ocqe->list); + kfree(ocqe); + } } /* Returns true if there are no backlogged entries after the flush */ -static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) +static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx) { - bool all_flushed; size_t cqe_size = sizeof(struct io_uring_cqe); - if (!force && __io_cqring_events(ctx) == ctx->cq_entries) - return false; + if (__io_cqring_events(ctx) == ctx->cq_entries) + return; if (ctx->flags & IORING_SETUP_CQE32) cqe_size <<= 1; @@ -617,43 +661,36 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force) struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true); struct io_overflow_cqe *ocqe; - if (!cqe && !force) + if (!cqe) break; ocqe = list_first_entry(&ctx->cq_overflow_list, struct io_overflow_cqe, list); - if (cqe) - memcpy(cqe, &ocqe->cqe, cqe_size); - else - io_account_cq_overflow(ctx); - + memcpy(cqe, &ocqe->cqe, cqe_size); list_del(&ocqe->list); kfree(ocqe); } - all_flushed = list_empty(&ctx->cq_overflow_list); - if (all_flushed) { + if (list_empty(&ctx->cq_overflow_list)) { clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq); atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags); } - io_cq_unlock_post(ctx); - return all_flushed; } -static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx) +static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx) { - bool ret = true; - - if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { - /* iopoll syncs against uring_lock, not completion_lock */ - if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_lock(&ctx->uring_lock); - ret = __io_cqring_overflow_flush(ctx, false); - if (ctx->flags & IORING_SETUP_IOPOLL) - mutex_unlock(&ctx->uring_lock); - } + /* iopoll syncs against uring_lock, not completion_lock */ + if (ctx->flags & IORING_SETUP_IOPOLL) + mutex_lock(&ctx->uring_lock); + __io_cqring_overflow_flush(ctx); + if (ctx->flags & IORING_SETUP_IOPOLL) + mutex_unlock(&ctx->uring_lock); +} - return ret; +static void io_cqring_overflow_flush(struct io_ring_ctx *ctx) +{ + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) + io_cqring_do_overflow_flush(ctx); } void __io_put_task(struct task_struct *task, int nr) @@ -778,11 +815,14 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow) return &rings->cqes[off]; } -bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, + u32 cflags) { struct io_uring_cqe *cqe; + if (!ctx->task_complete) + lockdep_assert_held(&ctx->completion_lock); + ctx->cq_extra++; /* @@ -804,34 +844,100 @@ bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags } return true; } - - if (allow_overflow) - return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); - return false; } -bool io_post_aux_cqe(struct io_ring_ctx *ctx, - u64 user_data, s32 res, u32 cflags, - bool allow_overflow) +static void __io_flush_post_cqes(struct io_ring_ctx *ctx) + __must_hold(&ctx->uring_lock) +{ + struct io_submit_state *state = &ctx->submit_state; + unsigned int i; + + lockdep_assert_held(&ctx->uring_lock); + for (i = 0; i < state->cqes_count; i++) { + struct io_uring_cqe *cqe = &state->cqes[i]; + + if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + spin_unlock(&ctx->completion_lock); + } else { + io_cqring_event_overflow(ctx, cqe->user_data, + cqe->res, cqe->flags, 0, 0); + } + } + } + state->cqes_count = 0; +} + +static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) { bool filled; io_cq_lock(ctx); - filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow); + filled = io_fill_cqe_aux(ctx, user_data, res, cflags); + if (!filled && allow_overflow) + filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0); + io_cq_unlock_post(ctx); return filled; } -static void __io_req_complete_put(struct io_kiocb *req) +bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags) { + return __io_post_aux_cqe(ctx, user_data, res, cflags, true); +} + +bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags, + bool allow_overflow) +{ + struct io_uring_cqe *cqe; + unsigned int length; + + if (!defer) + return __io_post_aux_cqe(ctx, user_data, res, cflags, allow_overflow); + + length = ARRAY_SIZE(ctx->submit_state.cqes); + + lockdep_assert_held(&ctx->uring_lock); + + if (ctx->submit_state.cqes_count == length) { + __io_cq_lock(ctx); + __io_flush_post_cqes(ctx); + /* no need to flush - flush is deferred */ + __io_cq_unlock_post(ctx); + } + + /* For defered completions this is not as strict as it is otherwise, + * however it's main job is to prevent unbounded posted completions, + * and in that it works just as well. + */ + if (!allow_overflow && test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) + return false; + + cqe = &ctx->submit_state.cqes[ctx->submit_state.cqes_count++]; + cqe->user_data = user_data; + cqe->res = res; + cqe->flags = cflags; + return true; +} + +static void __io_req_complete_post(struct io_kiocb *req) +{ + struct io_ring_ctx *ctx = req->ctx; + + io_cq_lock(ctx); + if (!(req->flags & REQ_F_CQE_SKIP)) + io_fill_cqe_req(ctx, req); + /* * If we're the last reference to this request, add to our locked * free_list cache. */ if (req_ref_put_and_test(req)) { - struct io_ring_ctx *ctx = req->ctx; - if (req->flags & IO_REQ_LINK_FLAGS) { if (req->flags & IO_DISARM_MASK) io_disarm_next(req); @@ -852,38 +958,38 @@ static void __io_req_complete_put(struct io_kiocb *req) wq_list_add_head(&req->comp_list, &ctx->locked_free_list); ctx->locked_free_nr++; } -} - -void __io_req_complete_post(struct io_kiocb *req) -{ - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(req->ctx, req); - __io_req_complete_put(req); -} - -void io_req_complete_post(struct io_kiocb *req) -{ - struct io_ring_ctx *ctx = req->ctx; - - io_cq_lock(ctx); - __io_req_complete_post(req); io_cq_unlock_post(ctx); } -inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags) +void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags) { - io_req_complete_post(req); + if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) { + req->io_task_work.func = io_req_task_complete; + io_req_task_work_add(req); + } else if (!(issue_flags & IO_URING_F_UNLOCKED) || + !(req->ctx->flags & IORING_SETUP_IOPOLL)) { + __io_req_complete_post(req); + } else { + struct io_ring_ctx *ctx = req->ctx; + + mutex_lock(&ctx->uring_lock); + __io_req_complete_post(req); + mutex_unlock(&ctx->uring_lock); + } } -void io_req_complete_failed(struct io_kiocb *req, s32 res) +void io_req_defer_failed(struct io_kiocb *req, s32 res) + __must_hold(&ctx->uring_lock) { const struct io_op_def *def = &io_op_defs[req->opcode]; + lockdep_assert_held(&req->ctx->uring_lock); + req_set_fail(req); io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED)); if (def->fail) def->fail(req); - io_req_complete_post(req); + io_req_complete_defer(req); } /* @@ -983,9 +1089,9 @@ static void __io_req_find_next_prep(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; - io_cq_lock(ctx); + spin_lock(&ctx->completion_lock); io_disarm_next(req); - io_cq_unlock_post(ctx); + spin_unlock(&ctx->completion_lock); } static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req) @@ -1084,10 +1190,17 @@ void tctx_task_work(struct callback_head *cb) struct io_uring_task *tctx = container_of(cb, struct io_uring_task, task_work); struct llist_node fake = {}; - struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake); + struct llist_node *node; unsigned int loops = 1; - unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL); + unsigned int count; + + if (unlikely(current->flags & PF_EXITING)) { + io_fallback_tw(tctx); + return; + } + node = io_llist_xchg(&tctx->task_list, &fake); + count = handle_tw_list(node, &ctx, &uring_locked, NULL); node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL); while (node != &fake) { loops++; @@ -1105,6 +1218,20 @@ void tctx_task_work(struct callback_head *cb) trace_io_uring_task_work_run(tctx, count, loops); } +static __cold void io_fallback_tw(struct io_uring_task *tctx) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); + struct io_kiocb *req; + + while (node) { + req = container_of(node, struct io_kiocb, io_task_work.node); + node = node->next; + if (llist_add(&req->io_task_work.node, + &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); + } +} + static void io_req_local_work_add(struct io_kiocb *req) { struct io_ring_ctx *ctx = req->ctx; @@ -1127,11 +1254,10 @@ static void io_req_local_work_add(struct io_kiocb *req) __io_cqring_wake(ctx); } -static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) +void __io_req_task_work_add(struct io_kiocb *req, bool allow_local) { struct io_uring_task *tctx = req->task->io_uring; struct io_ring_ctx *ctx = req->ctx; - struct llist_node *node; if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) { io_req_local_work_add(req); @@ -1148,20 +1274,7 @@ static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method))) return; - node = llist_del_all(&tctx->task_list); - - while (node) { - req = container_of(node, struct io_kiocb, io_task_work.node); - node = node->next; - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); - } -} - -void io_req_task_work_add(struct io_kiocb *req) -{ - __io_req_task_work_add(req, true); + io_fallback_tw(tctx); } static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) @@ -1237,23 +1350,10 @@ int io_run_local_work(struct io_ring_ctx *ctx) return ret; } -static void io_req_tw_post(struct io_kiocb *req, bool *locked) -{ - io_req_complete_post(req); -} - -void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags) -{ - io_req_set_res(req, res, cflags); - req->io_task_work.func = io_req_tw_post; - io_req_task_work_add(req); -} - static void io_req_task_cancel(struct io_kiocb *req, bool *locked) { - /* not needed for normal modes, but SQPOLL depends on it */ io_tw_lock(req->ctx, locked); - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } void io_req_task_submit(struct io_kiocb *req, bool *locked) @@ -1263,7 +1363,7 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked) if (likely(!(req->task->flags & PF_EXITING))) io_queue_sqe(req); else - io_req_complete_failed(req, -EFAULT); + io_req_defer_failed(req, -EFAULT); } void io_req_task_queue_fail(struct io_kiocb *req, int ret) @@ -1343,18 +1443,31 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx) struct io_wq_work_node *node, *prev; struct io_submit_state *state = &ctx->submit_state; - io_cq_lock(ctx); + __io_cq_lock(ctx); + /* must come first to preserve CQE ordering in failure cases */ + if (state->cqes_count) + __io_flush_post_cqes(ctx); wq_list_for_each(node, prev, &state->compl_reqs) { struct io_kiocb *req = container_of(node, struct io_kiocb, comp_list); - if (!(req->flags & REQ_F_CQE_SKIP)) - __io_fill_cqe_req(ctx, req); + if (!(req->flags & REQ_F_CQE_SKIP) && + unlikely(!__io_fill_cqe_req(ctx, req))) { + if (ctx->task_complete) { + spin_lock(&ctx->completion_lock); + io_req_cqe_overflow(req); + spin_unlock(&ctx->completion_lock); + } else { + io_req_cqe_overflow(req); + } + } } __io_cq_unlock_post(ctx); - io_free_batch_list(ctx, state->compl_reqs.first); - INIT_WQ_LIST(&state->compl_reqs); + if (!wq_list_empty(&ctx->submit_state.compl_reqs)) { + io_free_batch_list(ctx, state->compl_reqs.first); + INIT_WQ_LIST(&state->compl_reqs); + } } /* @@ -1420,7 +1533,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) check_cq = READ_ONCE(ctx->check_cq); if (unlikely(check_cq)) { if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT)) - __io_cqring_overflow_flush(ctx, false); + __io_cqring_overflow_flush(ctx); /* * Similarly do not spin if we have not informed the user of any * dropped CQE. @@ -1476,16 +1589,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min) void io_req_task_complete(struct io_kiocb *req, bool *locked) { - if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) { - unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED; - - req->cqe.flags |= io_put_kbuf(req, issue_flags); - } - if (*locked) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, IO_URING_F_UNLOCKED); } /* @@ -1635,6 +1742,7 @@ static u32 io_get_sequence(struct io_kiocb *req) } static __cold void io_drain_req(struct io_kiocb *req) + __must_hold(&ctx->uring_lock) { struct io_ring_ctx *ctx = req->ctx; struct io_defer_entry *de; @@ -1655,7 +1763,7 @@ queue: ret = io_req_prep_async(req); if (ret) { fail: - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } io_prep_async_link(req); @@ -1752,12 +1860,12 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) if (issue_flags & IO_URING_F_COMPLETE_DEFER) io_req_complete_defer(req); else - io_req_complete_post(req); + io_req_complete_post(req, issue_flags); } else if (ret != IOU_ISSUE_SKIP_COMPLETE) return ret; /* If the op doesn't have a file, we're not polling for it */ - if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file) + if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue) io_iopoll_req_issued(req, issue_flags); return 0; @@ -1766,9 +1874,8 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags) int io_poll_issue(struct io_kiocb *req, bool *locked) { io_tw_lock(req->ctx, locked); - if (unlikely(req->task->flags & PF_EXITING)) - return -EFAULT; - return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT); + return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT| + IO_URING_F_COMPLETE_DEFER); } struct io_wq_work *io_wq_free_work(struct io_wq_work *work) @@ -1783,11 +1890,11 @@ void io_wq_submit_work(struct io_wq_work *work) { struct io_kiocb *req = container_of(work, struct io_kiocb, work); const struct io_op_def *def = &io_op_defs[req->opcode]; - unsigned int issue_flags = IO_URING_F_UNLOCKED; + unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ; bool needs_poll = false; int ret = 0, err = -ECANCELED; - /* one will be dropped by ->io_free_work() after returning to io-wq */ + /* one will be dropped by ->io_wq_free_work() after returning to io-wq */ if (!(req->flags & REQ_F_REFCOUNT)) __io_req_set_refcount(req, 2); else @@ -1885,7 +1992,7 @@ static void io_queue_async(struct io_kiocb *req, int ret) struct io_kiocb *linked_timeout; if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) { - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); return; } @@ -1935,14 +2042,14 @@ static void io_queue_sqe_fallback(struct io_kiocb *req) */ req->flags &= ~REQ_F_HARDLINK; req->flags |= REQ_F_LINK; - io_req_complete_failed(req, req->cqe.res); + io_req_defer_failed(req, req->cqe.res); } else if (unlikely(req->ctx->drain_active)) { io_drain_req(req); } else { int ret = io_req_prep_async(req); if (unlikely(ret)) - io_req_complete_failed(req, ret); + io_req_defer_failed(req, ret); else io_queue_iowq(req, NULL); } @@ -2378,7 +2485,14 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx, } if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS)) return -ETIME; - return 1; + + /* + * Run task_work after scheduling. If we got woken because of + * task_work being processed, run it now rather than let the caller + * do another wait loop. + */ + ret = io_run_task_work_sig(ctx); + return ret < 0 ? ret : 1; } /* @@ -2439,14 +2553,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, trace_io_uring_cqring_wait(ctx, min_events); do { - /* if we can't even flush overflow, don't wait for more */ - if (!io_cqring_overflow_flush(ctx)) { - ret = -EBUSY; - break; + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { + finish_wait(&ctx->cq_wait, &iowq.wq); + io_cqring_do_overflow_flush(ctx); } prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq, TASK_INTERRUPTIBLE); ret = io_cqring_wait_schedule(ctx, &iowq, timeout); + if (__io_cqring_events_user(ctx) >= min_events) + break; cond_resched(); } while (ret > 0); @@ -2594,8 +2709,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx) __io_sqe_buffers_unregister(ctx); if (ctx->file_data) __io_sqe_files_unregister(ctx); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); + io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free); io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free); @@ -2670,7 +2784,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait) * lock(&ep->mtx); * * Users may get EPOLLIN meanwhile seeing nothing in cqring, this - * pushs them to do the flush. + * pushes them to do the flush. */ if (io_cqring_events(ctx) || io_has_work(ctx)) @@ -2707,8 +2821,10 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb) /* * When @in_idle, we're in cancellation and it's racy to remove the * node. It'll be removed by the end of cancellation, just ignore it. + * tctx can be NULL if the queueing of this task_work raced with + * work cancelation off the exec path. */ - if (!atomic_read(&tctx->in_idle)) + if (tctx && !atomic_read(&tctx->in_idle)) io_uring_del_tctx_node((unsigned long)work->ctx); complete(&work->completion); } @@ -2736,6 +2852,12 @@ static __cold void io_ring_exit_work(struct work_struct *work) * as nobody else will be looking for them. */ do { + if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) { + mutex_lock(&ctx->uring_lock); + io_cqring_overflow_kill(ctx); + mutex_unlock(&ctx->uring_lock); + } + if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) io_move_task_work_from_local(ctx); @@ -2801,8 +2923,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) mutex_lock(&ctx->uring_lock); percpu_ref_kill(&ctx->refs); - if (ctx->rings) - __io_cqring_overflow_flush(ctx, true); xa_for_each(&ctx->personalities, index, creds) io_unregister_personality(ctx, index); if (ctx->rings) @@ -2869,7 +2989,7 @@ static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx, while (!list_empty(&list)) { de = list_first_entry(&list, struct io_defer_entry, list); list_del_init(&de->list); - io_req_complete_failed(de->req, -ECANCELED); + io_req_task_queue_fail(de->req, -ECANCELED); kfree(de); } return true; @@ -3444,6 +3564,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p, if (!ctx) return -ENOMEM; + if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) && + !(ctx->flags & IORING_SETUP_IOPOLL) && + !(ctx->flags & IORING_SETUP_SQPOLL)) + ctx->task_complete = true; + /* * When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user * space applications don't need to do io completion events @@ -3895,8 +4020,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode, return -EEXIST; if (ctx->restricted) { - if (opcode >= IORING_REGISTER_LAST) - return -EINVAL; opcode = array_index_nospec(opcode, IORING_REGISTER_LAST); if (!test_bit(opcode, ctx->restrictions.register_op)) return -EACCES; @@ -4052,6 +4175,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, long ret = -EBADF; struct fd f; + if (opcode >= IORING_REGISTER_LAST) + return -EINVAL; + f = fdget(fd); if (!f.file) return -EBADF; @@ -4062,8 +4188,6 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode, ctx = f.file->private_data; - io_run_task_work_ctx(ctx); - mutex_lock(&ctx->uring_lock); ret = __io_uring_register(ctx, opcode, arg, nr_args); mutex_unlock(&ctx->uring_lock); |