summaryrefslogtreecommitdiffstats
path: root/io_uring/io_uring.c
diff options
context:
space:
mode:
Diffstat (limited to 'io_uring/io_uring.c')
-rw-r--r--io_uring/io_uring.c418
1 files changed, 271 insertions, 147 deletions
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 8840cf3e20f2..58ac13b69dc8 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -149,6 +149,7 @@ static void io_clean_op(struct io_kiocb *req);
static void io_queue_sqe(struct io_kiocb *req);
static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
+static __cold void io_fallback_tw(struct io_uring_task *tctx);
static struct kmem_cache *req_cachep;
@@ -167,7 +168,8 @@ EXPORT_SYMBOL(io_uring_get_socket);
static inline void io_submit_flush_completions(struct io_ring_ctx *ctx)
{
- if (!wq_list_empty(&ctx->submit_state.compl_reqs))
+ if (!wq_list_empty(&ctx->submit_state.compl_reqs) ||
+ ctx->submit_state.cqes_count)
__io_submit_flush_completions(ctx);
}
@@ -325,6 +327,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
spin_lock_init(&ctx->rsrc_ref_lock);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
+ init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
init_llist_head(&ctx->rsrc_put_llist);
init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
@@ -495,7 +498,7 @@ static void io_eventfd_ops(struct rcu_head *rcu)
int ops = atomic_xchg(&ev_fd->ops, 0);
if (ops & BIT(IO_EVENTFD_OP_SIGNAL_BIT))
- eventfd_signal(ev_fd->cq_ev_fd, 1);
+ eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
/* IO_EVENTFD_OP_FREE_BIT may not be set here depending on callback
* ordering in a race but if references are 0 we know we have to free
@@ -531,11 +534,11 @@ static void io_eventfd_signal(struct io_ring_ctx *ctx)
goto out;
if (likely(eventfd_signal_allowed())) {
- eventfd_signal(ev_fd->cq_ev_fd, 1);
+ eventfd_signal_mask(ev_fd->cq_ev_fd, 1, EPOLL_URING_WAKE);
} else {
atomic_inc(&ev_fd->refs);
if (!atomic_fetch_or(BIT(IO_EVENTFD_OP_SIGNAL_BIT), &ev_fd->ops))
- call_rcu(&ev_fd->rcu, io_eventfd_ops);
+ call_rcu_hurry(&ev_fd->rcu, io_eventfd_ops);
else
atomic_dec(&ev_fd->refs);
}
@@ -569,45 +572,86 @@ static void io_eventfd_flush_signal(struct io_ring_ctx *ctx)
void __io_commit_cqring_flush(struct io_ring_ctx *ctx)
{
- if (ctx->off_timeout_used || ctx->drain_active) {
+ if (ctx->off_timeout_used)
+ io_flush_timeouts(ctx);
+ if (ctx->drain_active) {
spin_lock(&ctx->completion_lock);
- if (ctx->off_timeout_used)
- io_flush_timeouts(ctx);
- if (ctx->drain_active)
- io_queue_deferred(ctx);
+ io_queue_deferred(ctx);
spin_unlock(&ctx->completion_lock);
}
if (ctx->has_evfd)
io_eventfd_flush_signal(ctx);
}
-static inline void io_cqring_ev_posted(struct io_ring_ctx *ctx)
+static inline void __io_cq_lock(struct io_ring_ctx *ctx)
+ __acquires(ctx->completion_lock)
+{
+ if (!ctx->task_complete)
+ spin_lock(&ctx->completion_lock);
+}
+
+static inline void __io_cq_unlock(struct io_ring_ctx *ctx)
+{
+ if (!ctx->task_complete)
+ spin_unlock(&ctx->completion_lock);
+}
+
+static inline void io_cq_lock(struct io_ring_ctx *ctx)
+ __acquires(ctx->completion_lock)
+{
+ spin_lock(&ctx->completion_lock);
+}
+
+static inline void io_cq_unlock(struct io_ring_ctx *ctx)
+ __releases(ctx->completion_lock)
{
+ spin_unlock(&ctx->completion_lock);
+}
+
+/* keep it inlined for io_submit_flush_completions() */
+static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
+ __releases(ctx->completion_lock)
+{
+ io_commit_cqring(ctx);
+ __io_cq_unlock(ctx);
io_commit_cqring_flush(ctx);
io_cqring_wake(ctx);
}
-static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
+void io_cq_unlock_post(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
- io_cqring_ev_posted(ctx);
+ io_commit_cqring_flush(ctx);
+ io_cqring_wake(ctx);
}
-void io_cq_unlock_post(struct io_ring_ctx *ctx)
+/* Returns true if there are no backlogged entries after the flush */
+static void io_cqring_overflow_kill(struct io_ring_ctx *ctx)
{
- __io_cq_unlock_post(ctx);
+ struct io_overflow_cqe *ocqe;
+ LIST_HEAD(list);
+
+ io_cq_lock(ctx);
+ list_splice_init(&ctx->cq_overflow_list, &list);
+ clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
+ io_cq_unlock(ctx);
+
+ while (!list_empty(&list)) {
+ ocqe = list_first_entry(&list, struct io_overflow_cqe, list);
+ list_del(&ocqe->list);
+ kfree(ocqe);
+ }
}
/* Returns true if there are no backlogged entries after the flush */
-static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
+static void __io_cqring_overflow_flush(struct io_ring_ctx *ctx)
{
- bool all_flushed;
size_t cqe_size = sizeof(struct io_uring_cqe);
- if (!force && __io_cqring_events(ctx) == ctx->cq_entries)
- return false;
+ if (__io_cqring_events(ctx) == ctx->cq_entries)
+ return;
if (ctx->flags & IORING_SETUP_CQE32)
cqe_size <<= 1;
@@ -617,43 +661,36 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
struct io_uring_cqe *cqe = io_get_cqe_overflow(ctx, true);
struct io_overflow_cqe *ocqe;
- if (!cqe && !force)
+ if (!cqe)
break;
ocqe = list_first_entry(&ctx->cq_overflow_list,
struct io_overflow_cqe, list);
- if (cqe)
- memcpy(cqe, &ocqe->cqe, cqe_size);
- else
- io_account_cq_overflow(ctx);
-
+ memcpy(cqe, &ocqe->cqe, cqe_size);
list_del(&ocqe->list);
kfree(ocqe);
}
- all_flushed = list_empty(&ctx->cq_overflow_list);
- if (all_flushed) {
+ if (list_empty(&ctx->cq_overflow_list)) {
clear_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
atomic_andnot(IORING_SQ_CQ_OVERFLOW, &ctx->rings->sq_flags);
}
-
io_cq_unlock_post(ctx);
- return all_flushed;
}
-static bool io_cqring_overflow_flush(struct io_ring_ctx *ctx)
+static void io_cqring_do_overflow_flush(struct io_ring_ctx *ctx)
{
- bool ret = true;
-
- if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
- /* iopoll syncs against uring_lock, not completion_lock */
- if (ctx->flags & IORING_SETUP_IOPOLL)
- mutex_lock(&ctx->uring_lock);
- ret = __io_cqring_overflow_flush(ctx, false);
- if (ctx->flags & IORING_SETUP_IOPOLL)
- mutex_unlock(&ctx->uring_lock);
- }
+ /* iopoll syncs against uring_lock, not completion_lock */
+ if (ctx->flags & IORING_SETUP_IOPOLL)
+ mutex_lock(&ctx->uring_lock);
+ __io_cqring_overflow_flush(ctx);
+ if (ctx->flags & IORING_SETUP_IOPOLL)
+ mutex_unlock(&ctx->uring_lock);
+}
- return ret;
+static void io_cqring_overflow_flush(struct io_ring_ctx *ctx)
+{
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
+ io_cqring_do_overflow_flush(ctx);
}
void __io_put_task(struct task_struct *task, int nr)
@@ -778,11 +815,14 @@ struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx, bool overflow)
return &rings->cqes[off];
}
-bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
- bool allow_overflow)
+static bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res,
+ u32 cflags)
{
struct io_uring_cqe *cqe;
+ if (!ctx->task_complete)
+ lockdep_assert_held(&ctx->completion_lock);
+
ctx->cq_extra++;
/*
@@ -804,34 +844,100 @@ bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags
}
return true;
}
-
- if (allow_overflow)
- return io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
-
return false;
}
-bool io_post_aux_cqe(struct io_ring_ctx *ctx,
- u64 user_data, s32 res, u32 cflags,
- bool allow_overflow)
+static void __io_flush_post_cqes(struct io_ring_ctx *ctx)
+ __must_hold(&ctx->uring_lock)
+{
+ struct io_submit_state *state = &ctx->submit_state;
+ unsigned int i;
+
+ lockdep_assert_held(&ctx->uring_lock);
+ for (i = 0; i < state->cqes_count; i++) {
+ struct io_uring_cqe *cqe = &state->cqes[i];
+
+ if (!io_fill_cqe_aux(ctx, cqe->user_data, cqe->res, cqe->flags)) {
+ if (ctx->task_complete) {
+ spin_lock(&ctx->completion_lock);
+ io_cqring_event_overflow(ctx, cqe->user_data,
+ cqe->res, cqe->flags, 0, 0);
+ spin_unlock(&ctx->completion_lock);
+ } else {
+ io_cqring_event_overflow(ctx, cqe->user_data,
+ cqe->res, cqe->flags, 0, 0);
+ }
+ }
+ }
+ state->cqes_count = 0;
+}
+
+static bool __io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
+ bool allow_overflow)
{
bool filled;
io_cq_lock(ctx);
- filled = io_fill_cqe_aux(ctx, user_data, res, cflags, allow_overflow);
+ filled = io_fill_cqe_aux(ctx, user_data, res, cflags);
+ if (!filled && allow_overflow)
+ filled = io_cqring_event_overflow(ctx, user_data, res, cflags, 0, 0);
+
io_cq_unlock_post(ctx);
return filled;
}
-static void __io_req_complete_put(struct io_kiocb *req)
+bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags)
{
+ return __io_post_aux_cqe(ctx, user_data, res, cflags, true);
+}
+
+bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags,
+ bool allow_overflow)
+{
+ struct io_uring_cqe *cqe;
+ unsigned int length;
+
+ if (!defer)
+ return __io_post_aux_cqe(ctx, user_data, res, cflags, allow_overflow);
+
+ length = ARRAY_SIZE(ctx->submit_state.cqes);
+
+ lockdep_assert_held(&ctx->uring_lock);
+
+ if (ctx->submit_state.cqes_count == length) {
+ __io_cq_lock(ctx);
+ __io_flush_post_cqes(ctx);
+ /* no need to flush - flush is deferred */
+ __io_cq_unlock_post(ctx);
+ }
+
+ /* For defered completions this is not as strict as it is otherwise,
+ * however it's main job is to prevent unbounded posted completions,
+ * and in that it works just as well.
+ */
+ if (!allow_overflow && test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq))
+ return false;
+
+ cqe = &ctx->submit_state.cqes[ctx->submit_state.cqes_count++];
+ cqe->user_data = user_data;
+ cqe->res = res;
+ cqe->flags = cflags;
+ return true;
+}
+
+static void __io_req_complete_post(struct io_kiocb *req)
+{
+ struct io_ring_ctx *ctx = req->ctx;
+
+ io_cq_lock(ctx);
+ if (!(req->flags & REQ_F_CQE_SKIP))
+ io_fill_cqe_req(ctx, req);
+
/*
* If we're the last reference to this request, add to our locked
* free_list cache.
*/
if (req_ref_put_and_test(req)) {
- struct io_ring_ctx *ctx = req->ctx;
-
if (req->flags & IO_REQ_LINK_FLAGS) {
if (req->flags & IO_DISARM_MASK)
io_disarm_next(req);
@@ -852,38 +958,38 @@ static void __io_req_complete_put(struct io_kiocb *req)
wq_list_add_head(&req->comp_list, &ctx->locked_free_list);
ctx->locked_free_nr++;
}
-}
-
-void __io_req_complete_post(struct io_kiocb *req)
-{
- if (!(req->flags & REQ_F_CQE_SKIP))
- __io_fill_cqe_req(req->ctx, req);
- __io_req_complete_put(req);
-}
-
-void io_req_complete_post(struct io_kiocb *req)
-{
- struct io_ring_ctx *ctx = req->ctx;
-
- io_cq_lock(ctx);
- __io_req_complete_post(req);
io_cq_unlock_post(ctx);
}
-inline void __io_req_complete(struct io_kiocb *req, unsigned issue_flags)
+void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
- io_req_complete_post(req);
+ if (req->ctx->task_complete && (issue_flags & IO_URING_F_IOWQ)) {
+ req->io_task_work.func = io_req_task_complete;
+ io_req_task_work_add(req);
+ } else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
+ !(req->ctx->flags & IORING_SETUP_IOPOLL)) {
+ __io_req_complete_post(req);
+ } else {
+ struct io_ring_ctx *ctx = req->ctx;
+
+ mutex_lock(&ctx->uring_lock);
+ __io_req_complete_post(req);
+ mutex_unlock(&ctx->uring_lock);
+ }
}
-void io_req_complete_failed(struct io_kiocb *req, s32 res)
+void io_req_defer_failed(struct io_kiocb *req, s32 res)
+ __must_hold(&ctx->uring_lock)
{
const struct io_op_def *def = &io_op_defs[req->opcode];
+ lockdep_assert_held(&req->ctx->uring_lock);
+
req_set_fail(req);
io_req_set_res(req, res, io_put_kbuf(req, IO_URING_F_UNLOCKED));
if (def->fail)
def->fail(req);
- io_req_complete_post(req);
+ io_req_complete_defer(req);
}
/*
@@ -983,9 +1089,9 @@ static void __io_req_find_next_prep(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
- io_cq_lock(ctx);
+ spin_lock(&ctx->completion_lock);
io_disarm_next(req);
- io_cq_unlock_post(ctx);
+ spin_unlock(&ctx->completion_lock);
}
static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
@@ -1084,10 +1190,17 @@ void tctx_task_work(struct callback_head *cb)
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
struct llist_node fake = {};
- struct llist_node *node = io_llist_xchg(&tctx->task_list, &fake);
+ struct llist_node *node;
unsigned int loops = 1;
- unsigned int count = handle_tw_list(node, &ctx, &uring_locked, NULL);
+ unsigned int count;
+
+ if (unlikely(current->flags & PF_EXITING)) {
+ io_fallback_tw(tctx);
+ return;
+ }
+ node = io_llist_xchg(&tctx->task_list, &fake);
+ count = handle_tw_list(node, &ctx, &uring_locked, NULL);
node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
while (node != &fake) {
loops++;
@@ -1105,6 +1218,20 @@ void tctx_task_work(struct callback_head *cb)
trace_io_uring_task_work_run(tctx, count, loops);
}
+static __cold void io_fallback_tw(struct io_uring_task *tctx)
+{
+ struct llist_node *node = llist_del_all(&tctx->task_list);
+ struct io_kiocb *req;
+
+ while (node) {
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ node = node->next;
+ if (llist_add(&req->io_task_work.node,
+ &req->ctx->fallback_llist))
+ schedule_delayed_work(&req->ctx->fallback_work, 1);
+ }
+}
+
static void io_req_local_work_add(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
@@ -1127,11 +1254,10 @@ static void io_req_local_work_add(struct io_kiocb *req)
__io_cqring_wake(ctx);
}
-static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
+void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
- struct llist_node *node;
if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
io_req_local_work_add(req);
@@ -1148,20 +1274,7 @@ static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local
if (likely(!task_work_add(req->task, &tctx->task_work, ctx->notify_method)))
return;
- node = llist_del_all(&tctx->task_list);
-
- while (node) {
- req = container_of(node, struct io_kiocb, io_task_work.node);
- node = node->next;
- if (llist_add(&req->io_task_work.node,
- &req->ctx->fallback_llist))
- schedule_delayed_work(&req->ctx->fallback_work, 1);
- }
-}
-
-void io_req_task_work_add(struct io_kiocb *req)
-{
- __io_req_task_work_add(req, true);
+ io_fallback_tw(tctx);
}
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
@@ -1237,23 +1350,10 @@ int io_run_local_work(struct io_ring_ctx *ctx)
return ret;
}
-static void io_req_tw_post(struct io_kiocb *req, bool *locked)
-{
- io_req_complete_post(req);
-}
-
-void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags)
-{
- io_req_set_res(req, res, cflags);
- req->io_task_work.func = io_req_tw_post;
- io_req_task_work_add(req);
-}
-
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
{
- /* not needed for normal modes, but SQPOLL depends on it */
io_tw_lock(req->ctx, locked);
- io_req_complete_failed(req, req->cqe.res);
+ io_req_defer_failed(req, req->cqe.res);
}
void io_req_task_submit(struct io_kiocb *req, bool *locked)
@@ -1263,7 +1363,7 @@ void io_req_task_submit(struct io_kiocb *req, bool *locked)
if (likely(!(req->task->flags & PF_EXITING)))
io_queue_sqe(req);
else
- io_req_complete_failed(req, -EFAULT);
+ io_req_defer_failed(req, -EFAULT);
}
void io_req_task_queue_fail(struct io_kiocb *req, int ret)
@@ -1343,18 +1443,31 @@ static void __io_submit_flush_completions(struct io_ring_ctx *ctx)
struct io_wq_work_node *node, *prev;
struct io_submit_state *state = &ctx->submit_state;
- io_cq_lock(ctx);
+ __io_cq_lock(ctx);
+ /* must come first to preserve CQE ordering in failure cases */
+ if (state->cqes_count)
+ __io_flush_post_cqes(ctx);
wq_list_for_each(node, prev, &state->compl_reqs) {
struct io_kiocb *req = container_of(node, struct io_kiocb,
comp_list);
- if (!(req->flags & REQ_F_CQE_SKIP))
- __io_fill_cqe_req(ctx, req);
+ if (!(req->flags & REQ_F_CQE_SKIP) &&
+ unlikely(!__io_fill_cqe_req(ctx, req))) {
+ if (ctx->task_complete) {
+ spin_lock(&ctx->completion_lock);
+ io_req_cqe_overflow(req);
+ spin_unlock(&ctx->completion_lock);
+ } else {
+ io_req_cqe_overflow(req);
+ }
+ }
}
__io_cq_unlock_post(ctx);
- io_free_batch_list(ctx, state->compl_reqs.first);
- INIT_WQ_LIST(&state->compl_reqs);
+ if (!wq_list_empty(&ctx->submit_state.compl_reqs)) {
+ io_free_batch_list(ctx, state->compl_reqs.first);
+ INIT_WQ_LIST(&state->compl_reqs);
+ }
}
/*
@@ -1420,7 +1533,7 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
check_cq = READ_ONCE(ctx->check_cq);
if (unlikely(check_cq)) {
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
- __io_cqring_overflow_flush(ctx, false);
+ __io_cqring_overflow_flush(ctx);
/*
* Similarly do not spin if we have not informed the user of any
* dropped CQE.
@@ -1476,16 +1589,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
void io_req_task_complete(struct io_kiocb *req, bool *locked)
{
- if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
- unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
-
- req->cqe.flags |= io_put_kbuf(req, issue_flags);
- }
-
if (*locked)
io_req_complete_defer(req);
else
- io_req_complete_post(req);
+ io_req_complete_post(req, IO_URING_F_UNLOCKED);
}
/*
@@ -1635,6 +1742,7 @@ static u32 io_get_sequence(struct io_kiocb *req)
}
static __cold void io_drain_req(struct io_kiocb *req)
+ __must_hold(&ctx->uring_lock)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_defer_entry *de;
@@ -1655,7 +1763,7 @@ queue:
ret = io_req_prep_async(req);
if (ret) {
fail:
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
return;
}
io_prep_async_link(req);
@@ -1752,12 +1860,12 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
if (issue_flags & IO_URING_F_COMPLETE_DEFER)
io_req_complete_defer(req);
else
- io_req_complete_post(req);
+ io_req_complete_post(req, issue_flags);
} else if (ret != IOU_ISSUE_SKIP_COMPLETE)
return ret;
/* If the op doesn't have a file, we're not polling for it */
- if ((req->ctx->flags & IORING_SETUP_IOPOLL) && req->file)
+ if ((req->ctx->flags & IORING_SETUP_IOPOLL) && def->iopoll_queue)
io_iopoll_req_issued(req, issue_flags);
return 0;
@@ -1766,9 +1874,8 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
int io_poll_issue(struct io_kiocb *req, bool *locked)
{
io_tw_lock(req->ctx, locked);
- if (unlikely(req->task->flags & PF_EXITING))
- return -EFAULT;
- return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT);
+ return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
+ IO_URING_F_COMPLETE_DEFER);
}
struct io_wq_work *io_wq_free_work(struct io_wq_work *work)
@@ -1783,11 +1890,11 @@ void io_wq_submit_work(struct io_wq_work *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
const struct io_op_def *def = &io_op_defs[req->opcode];
- unsigned int issue_flags = IO_URING_F_UNLOCKED;
+ unsigned int issue_flags = IO_URING_F_UNLOCKED | IO_URING_F_IOWQ;
bool needs_poll = false;
int ret = 0, err = -ECANCELED;
- /* one will be dropped by ->io_free_work() after returning to io-wq */
+ /* one will be dropped by ->io_wq_free_work() after returning to io-wq */
if (!(req->flags & REQ_F_REFCOUNT))
__io_req_set_refcount(req, 2);
else
@@ -1885,7 +1992,7 @@ static void io_queue_async(struct io_kiocb *req, int ret)
struct io_kiocb *linked_timeout;
if (ret != -EAGAIN || (req->flags & REQ_F_NOWAIT)) {
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
return;
}
@@ -1935,14 +2042,14 @@ static void io_queue_sqe_fallback(struct io_kiocb *req)
*/
req->flags &= ~REQ_F_HARDLINK;
req->flags |= REQ_F_LINK;
- io_req_complete_failed(req, req->cqe.res);
+ io_req_defer_failed(req, req->cqe.res);
} else if (unlikely(req->ctx->drain_active)) {
io_drain_req(req);
} else {
int ret = io_req_prep_async(req);
if (unlikely(ret))
- io_req_complete_failed(req, ret);
+ io_req_defer_failed(req, ret);
else
io_queue_iowq(req, NULL);
}
@@ -2378,7 +2485,14 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
}
if (!schedule_hrtimeout(&timeout, HRTIMER_MODE_ABS))
return -ETIME;
- return 1;
+
+ /*
+ * Run task_work after scheduling. If we got woken because of
+ * task_work being processed, run it now rather than let the caller
+ * do another wait loop.
+ */
+ ret = io_run_task_work_sig(ctx);
+ return ret < 0 ? ret : 1;
}
/*
@@ -2439,14 +2553,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
trace_io_uring_cqring_wait(ctx, min_events);
do {
- /* if we can't even flush overflow, don't wait for more */
- if (!io_cqring_overflow_flush(ctx)) {
- ret = -EBUSY;
- break;
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
+ finish_wait(&ctx->cq_wait, &iowq.wq);
+ io_cqring_do_overflow_flush(ctx);
}
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
TASK_INTERRUPTIBLE);
ret = io_cqring_wait_schedule(ctx, &iowq, timeout);
+ if (__io_cqring_events_user(ctx) >= min_events)
+ break;
cond_resched();
} while (ret > 0);
@@ -2594,8 +2709,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
__io_sqe_buffers_unregister(ctx);
if (ctx->file_data)
__io_sqe_files_unregister(ctx);
- if (ctx->rings)
- __io_cqring_overflow_flush(ctx, true);
+ io_cqring_overflow_kill(ctx);
io_eventfd_unregister(ctx);
io_alloc_cache_free(&ctx->apoll_cache, io_apoll_cache_free);
io_alloc_cache_free(&ctx->netmsg_cache, io_netmsg_cache_free);
@@ -2670,7 +2784,7 @@ static __poll_t io_uring_poll(struct file *file, poll_table *wait)
* lock(&ep->mtx);
*
* Users may get EPOLLIN meanwhile seeing nothing in cqring, this
- * pushs them to do the flush.
+ * pushes them to do the flush.
*/
if (io_cqring_events(ctx) || io_has_work(ctx))
@@ -2707,8 +2821,10 @@ static __cold void io_tctx_exit_cb(struct callback_head *cb)
/*
* When @in_idle, we're in cancellation and it's racy to remove the
* node. It'll be removed by the end of cancellation, just ignore it.
+ * tctx can be NULL if the queueing of this task_work raced with
+ * work cancelation off the exec path.
*/
- if (!atomic_read(&tctx->in_idle))
+ if (tctx && !atomic_read(&tctx->in_idle))
io_uring_del_tctx_node((unsigned long)work->ctx);
complete(&work->completion);
}
@@ -2736,6 +2852,12 @@ static __cold void io_ring_exit_work(struct work_struct *work)
* as nobody else will be looking for them.
*/
do {
+ if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
+ mutex_lock(&ctx->uring_lock);
+ io_cqring_overflow_kill(ctx);
+ mutex_unlock(&ctx->uring_lock);
+ }
+
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
io_move_task_work_from_local(ctx);
@@ -2801,8 +2923,6 @@ static __cold void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
mutex_lock(&ctx->uring_lock);
percpu_ref_kill(&ctx->refs);
- if (ctx->rings)
- __io_cqring_overflow_flush(ctx, true);
xa_for_each(&ctx->personalities, index, creds)
io_unregister_personality(ctx, index);
if (ctx->rings)
@@ -2869,7 +2989,7 @@ static __cold bool io_cancel_defer_files(struct io_ring_ctx *ctx,
while (!list_empty(&list)) {
de = list_first_entry(&list, struct io_defer_entry, list);
list_del_init(&de->list);
- io_req_complete_failed(de->req, -ECANCELED);
+ io_req_task_queue_fail(de->req, -ECANCELED);
kfree(de);
}
return true;
@@ -3444,6 +3564,11 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
if (!ctx)
return -ENOMEM;
+ if ((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
+ !(ctx->flags & IORING_SETUP_IOPOLL) &&
+ !(ctx->flags & IORING_SETUP_SQPOLL))
+ ctx->task_complete = true;
+
/*
* When SETUP_IOPOLL and SETUP_SQPOLL are both enabled, user
* space applications don't need to do io completion events
@@ -3895,8 +4020,6 @@ static int __io_uring_register(struct io_ring_ctx *ctx, unsigned opcode,
return -EEXIST;
if (ctx->restricted) {
- if (opcode >= IORING_REGISTER_LAST)
- return -EINVAL;
opcode = array_index_nospec(opcode, IORING_REGISTER_LAST);
if (!test_bit(opcode, ctx->restrictions.register_op))
return -EACCES;
@@ -4052,6 +4175,9 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
long ret = -EBADF;
struct fd f;
+ if (opcode >= IORING_REGISTER_LAST)
+ return -EINVAL;
+
f = fdget(fd);
if (!f.file)
return -EBADF;
@@ -4062,8 +4188,6 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
ctx = f.file->private_data;
- io_run_task_work_ctx(ctx);
-
mutex_lock(&ctx->uring_lock);
ret = __io_uring_register(ctx, opcode, arg, nr_args);
mutex_unlock(&ctx->uring_lock);