summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/io-wq.c296
-rw-r--r--fs/io-wq.h4
-rw-r--r--fs/io_uring.c2
3 files changed, 220 insertions, 82 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index 9b375009a553..33b14b85752b 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -27,6 +27,7 @@ enum {
IO_WORKER_F_FREE = 4, /* worker on free list */
IO_WORKER_F_EXITING = 8, /* worker exiting */
IO_WORKER_F_FIXED = 16, /* static idle worker */
+ IO_WORKER_F_BOUND = 32, /* is doing bounded work */
};
enum {
@@ -66,6 +67,17 @@ struct io_wq_nulls_list {
#define IO_WQ_HASH_ORDER 5
#endif
+struct io_wqe_acct {
+ unsigned nr_workers;
+ unsigned max_workers;
+ atomic_t nr_running;
+};
+
+enum {
+ IO_WQ_ACCT_BOUND,
+ IO_WQ_ACCT_UNBOUND,
+};
+
/*
* Per-node worker thread pool
*/
@@ -78,9 +90,7 @@ struct io_wqe {
} ____cacheline_aligned_in_smp;
int node;
- unsigned nr_workers;
- unsigned max_workers;
- atomic_t nr_running;
+ struct io_wqe_acct acct[2];
struct io_wq_nulls_list free_list;
struct io_wq_nulls_list busy_list;
@@ -97,6 +107,7 @@ struct io_wq {
unsigned nr_wqes;
struct task_struct *manager;
+ struct user_struct *user;
struct mm_struct *mm;
refcount_t refs;
struct completion done;
@@ -152,10 +163,29 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker)
return dropped_lock;
}
+static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
+ struct io_wq_work *work)
+{
+ if (work->flags & IO_WQ_WORK_UNBOUND)
+ return &wqe->acct[IO_WQ_ACCT_UNBOUND];
+
+ return &wqe->acct[IO_WQ_ACCT_BOUND];
+}
+
+static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe,
+ struct io_worker *worker)
+{
+ if (worker->flags & IO_WORKER_F_BOUND)
+ return &wqe->acct[IO_WQ_ACCT_BOUND];
+
+ return &wqe->acct[IO_WQ_ACCT_UNBOUND];
+}
+
static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
- bool all_done = false;
+ struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
+ unsigned nr_workers;
/*
* If we're not at zero, someone else is holding a brief reference
@@ -169,7 +199,9 @@ static void io_worker_exit(struct io_worker *worker)
preempt_disable();
current->flags &= ~PF_IO_WORKER;
if (worker->flags & IO_WORKER_F_RUNNING)
- atomic_dec(&wqe->nr_running);
+ atomic_dec(&acct->nr_running);
+ if (!(worker->flags & IO_WORKER_F_BOUND))
+ atomic_dec(&wqe->wq->user->processes);
worker->flags = 0;
preempt_enable();
@@ -179,17 +211,88 @@ static void io_worker_exit(struct io_worker *worker)
__release(&wqe->lock);
spin_lock_irq(&wqe->lock);
}
- wqe->nr_workers--;
- all_done = !wqe->nr_workers;
+ acct->nr_workers--;
+ nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers +
+ wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers;
spin_unlock_irq(&wqe->lock);
/* all workers gone, wq exit can proceed */
- if (all_done && refcount_dec_and_test(&wqe->wq->refs))
+ if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs))
complete(&wqe->wq->done);
kfree_rcu(worker, rcu);
}
+static inline bool io_wqe_run_queue(struct io_wqe *wqe)
+ __must_hold(wqe->lock)
+{
+ if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
+ return true;
+ return false;
+}
+
+/*
+ * Check head of free list for an available worker. If one isn't available,
+ * caller must wake up the wq manager to create one.
+ */
+static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
+ __must_hold(RCU)
+{
+ struct hlist_nulls_node *n;
+ struct io_worker *worker;
+
+ n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head));
+ if (is_a_nulls(n))
+ return false;
+
+ worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
+ if (io_worker_get(worker)) {
+ wake_up(&worker->wait);
+ io_worker_release(worker);
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * We need a worker. If we find a free one, we're good. If not, and we're
+ * below the max number of workers, wake up the manager to create one.
+ */
+static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
+{
+ bool ret;
+
+ /*
+ * Most likely an attempt to queue unbounded work on an io_wq that
+ * wasn't setup with any unbounded workers.
+ */
+ WARN_ON_ONCE(!acct->max_workers);
+
+ rcu_read_lock();
+ ret = io_wqe_activate_free_worker(wqe);
+ rcu_read_unlock();
+
+ if (!ret && acct->nr_workers < acct->max_workers)
+ wake_up_process(wqe->wq->manager);
+}
+
+static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker)
+{
+ struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
+
+ atomic_inc(&acct->nr_running);
+}
+
+static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker)
+ __must_hold(wqe->lock)
+{
+ struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker);
+
+ if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
+ io_wqe_wake_worker(wqe, acct);
+}
+
static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
{
allow_kernel_signal(SIGINT);
@@ -198,7 +301,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker)
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
worker->restore_files = current->files;
- atomic_inc(&wqe->nr_running);
+ io_wqe_inc_running(wqe, worker);
}
/*
@@ -209,6 +312,8 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
struct io_wq_work *work)
__must_hold(wqe->lock)
{
+ bool worker_bound, work_bound;
+
if (worker->flags & IO_WORKER_F_FREE) {
worker->flags &= ~IO_WORKER_F_FREE;
hlist_nulls_del_init_rcu(&worker->nulls_node);
@@ -216,6 +321,28 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
&wqe->busy_list.head);
}
worker->cur_work = work;
+
+ /*
+ * If worker is moving from bound to unbound (or vice versa), then
+ * ensure we update the running accounting.
+ */
+ worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
+ work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
+ if (worker_bound != work_bound) {
+ io_wqe_dec_running(wqe, worker);
+ if (work_bound) {
+ worker->flags |= IO_WORKER_F_BOUND;
+ wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
+ wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
+ atomic_dec(&wqe->wq->user->processes);
+ } else {
+ worker->flags &= ~IO_WORKER_F_BOUND;
+ wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
+ wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
+ atomic_inc(&wqe->wq->user->processes);
+ }
+ io_wqe_inc_running(wqe, worker);
+ }
}
/*
@@ -335,14 +462,6 @@ next:
} while (1);
}
-static inline bool io_wqe_run_queue(struct io_wqe *wqe)
- __must_hold(wqe->lock)
-{
- if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED))
- return true;
- return false;
-}
-
static int io_wqe_worker(void *data)
{
struct io_worker *worker = data;
@@ -392,46 +511,6 @@ static int io_wqe_worker(void *data)
}
/*
- * Check head of free list for an available worker. If one isn't available,
- * caller must wake up the wq manager to create one.
- */
-static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
- __must_hold(RCU)
-{
- struct hlist_nulls_node *n;
- struct io_worker *worker;
-
- n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head));
- if (is_a_nulls(n))
- return false;
-
- worker = hlist_nulls_entry(n, struct io_worker, nulls_node);
- if (io_worker_get(worker)) {
- wake_up(&worker->wait);
- io_worker_release(worker);
- return true;
- }
-
- return false;
-}
-
-/*
- * We need a worker. If we find a free one, we're good. If not, and we're
- * below the max number of workers, wake up the manager to create one.
- */
-static void io_wqe_wake_worker(struct io_wqe *wqe)
-{
- bool ret;
-
- rcu_read_lock();
- ret = io_wqe_activate_free_worker(wqe);
- rcu_read_unlock();
-
- if (!ret && wqe->nr_workers < wqe->max_workers)
- wake_up_process(wqe->wq->manager);
-}
-
-/*
* Called when a worker is scheduled in. Mark us as currently running.
*/
void io_wq_worker_running(struct task_struct *tsk)
@@ -444,7 +523,7 @@ void io_wq_worker_running(struct task_struct *tsk)
if (worker->flags & IO_WORKER_F_RUNNING)
return;
worker->flags |= IO_WORKER_F_RUNNING;
- atomic_inc(&wqe->nr_running);
+ io_wqe_inc_running(wqe, worker);
}
/*
@@ -465,13 +544,13 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
worker->flags &= ~IO_WORKER_F_RUNNING;
spin_lock_irq(&wqe->lock);
- if (atomic_dec_and_test(&wqe->nr_running) && io_wqe_run_queue(wqe))
- io_wqe_wake_worker(wqe);
+ io_wqe_dec_running(wqe, worker);
spin_unlock_irq(&wqe->lock);
}
-static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
+static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
+ struct io_wqe_acct *acct =&wqe->acct[index];
struct io_worker *worker;
worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node);
@@ -484,7 +563,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
worker->wqe = wqe;
worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node,
- "io_wqe_worker-%d", wqe->node);
+ "io_wqe_worker-%d/%d", index, wqe->node);
if (IS_ERR(worker->task)) {
kfree(worker);
return;
@@ -493,24 +572,31 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe)
spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head);
worker->flags |= IO_WORKER_F_FREE;
- if (!wqe->nr_workers)
+ if (index == IO_WQ_ACCT_BOUND)
+ worker->flags |= IO_WORKER_F_BOUND;
+ if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
- wqe->nr_workers++;
+ acct->nr_workers++;
spin_unlock_irq(&wqe->lock);
+ if (index == IO_WQ_ACCT_UNBOUND)
+ atomic_inc(&wq->user->processes);
+
wake_up_process(worker->task);
}
-static inline bool io_wqe_need_new_worker(struct io_wqe *wqe)
+static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
__must_hold(wqe->lock)
{
- if (!wqe->nr_workers)
- return true;
- if (hlist_nulls_empty(&wqe->free_list.head) &&
- wqe->nr_workers < wqe->max_workers && io_wqe_run_queue(wqe))
- return true;
+ struct io_wqe_acct *acct = &wqe->acct[index];
- return false;
+ /* always ensure we have one bounded worker */
+ if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers)
+ return true;
+ /* if we have available workers or no work, no need */
+ if (!hlist_nulls_empty(&wqe->free_list.head) || !io_wqe_run_queue(wqe))
+ return false;
+ return acct->nr_workers < acct->max_workers;
}
/*
@@ -525,13 +611,18 @@ static int io_wq_manager(void *data)
for (i = 0; i < wq->nr_wqes; i++) {
struct io_wqe *wqe = wq->wqes[i];
- bool fork_worker = false;
+ bool fork_worker[2] = { false, false };
spin_lock_irq(&wqe->lock);
- fork_worker = io_wqe_need_new_worker(wqe);
+ if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
+ fork_worker[IO_WQ_ACCT_BOUND] = true;
+ if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
+ fork_worker[IO_WQ_ACCT_UNBOUND] = true;
spin_unlock_irq(&wqe->lock);
- if (fork_worker)
- create_io_worker(wq, wqe);
+ if (fork_worker[IO_WQ_ACCT_BOUND])
+ create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
+ if (fork_worker[IO_WQ_ACCT_UNBOUND])
+ create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
set_current_state(TASK_INTERRUPTIBLE);
schedule_timeout(HZ);
@@ -540,17 +631,53 @@ static int io_wq_manager(void *data)
return 0;
}
+static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
+ struct io_wq_work *work)
+{
+ bool free_worker;
+
+ if (!(work->flags & IO_WQ_WORK_UNBOUND))
+ return true;
+ if (atomic_read(&acct->nr_running))
+ return true;
+
+ rcu_read_lock();
+ free_worker = !hlist_nulls_empty(&wqe->free_list.head);
+ rcu_read_unlock();
+ if (free_worker)
+ return true;
+
+ if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers &&
+ !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN)))
+ return false;
+
+ return true;
+}
+
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
+ struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
unsigned long flags;
+ /*
+ * Do early check to see if we need a new unbound worker, and if we do,
+ * if we're allowed to do so. This isn't 100% accurate as there's a
+ * gap between this check and incrementing the value, but that's OK.
+ * It's close enough to not be an issue, fork() has the same delay.
+ */
+ if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
+ work->flags |= IO_WQ_WORK_CANCEL;
+ work->func(&work);
+ return;
+ }
+
spin_lock_irqsave(&wqe->lock, flags);
list_add_tail(&work->list, &wqe->work_list);
wqe->flags &= ~IO_WQE_FLAG_STALLED;
spin_unlock_irqrestore(&wqe->lock, flags);
- if (!atomic_read(&wqe->nr_running))
- io_wqe_wake_worker(wqe);
+ if (!atomic_read(&acct->nr_running))
+ io_wqe_wake_worker(wqe, acct);
}
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
@@ -828,7 +955,8 @@ void io_wq_flush(struct io_wq *wq)
}
}
-struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
+struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
+ struct user_struct *user)
{
int ret = -ENOMEM, i, node;
struct io_wq *wq;
@@ -844,6 +972,9 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
return ERR_PTR(-ENOMEM);
}
+ /* caller must already hold a reference to this */
+ wq->user = user;
+
i = 0;
refcount_set(&wq->refs, wq->nr_wqes);
for_each_online_node(node) {
@@ -854,7 +985,13 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
break;
wq->wqes[i] = wqe;
wqe->node = node;
- wqe->max_workers = concurrency;
+ wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
+ atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
+ if (user) {
+ wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
+ task_rlimit(current, RLIMIT_NPROC);
+ }
+ atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0);
wqe->node = node;
wqe->wq = wq;
spin_lock_init(&wqe->lock);
@@ -863,7 +1000,6 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm)
wqe->free_list.nulls = 0;
INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1);
wqe->busy_list.nulls = 1;
- atomic_set(&wqe->nr_running, 0);
i++;
}
diff --git a/fs/io-wq.h b/fs/io-wq.h
index 3de192dc73fc..8cb345256f35 100644
--- a/fs/io-wq.h
+++ b/fs/io-wq.h
@@ -9,6 +9,7 @@ enum {
IO_WQ_WORK_HASHED = 4,
IO_WQ_WORK_NEEDS_USER = 8,
IO_WQ_WORK_NEEDS_FILES = 16,
+ IO_WQ_WORK_UNBOUND = 32,
IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */
};
@@ -33,7 +34,8 @@ struct io_wq_work {
(work)->files = NULL; \
} while (0) \
-struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm);
+struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
+ struct user_struct *user);
void io_wq_destroy(struct io_wq *wq);
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work);
diff --git a/fs/io_uring.c b/fs/io_uring.c
index 4d89a2f222bf..831bea0fbc75 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -3745,7 +3745,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
/* Do QD, or 4 * CPUS, whatever is smallest */
concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
- ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
+ ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, NULL);
if (IS_ERR(ctx->io_wq)) {
ret = PTR_ERR(ctx->io_wq);
ctx->io_wq = NULL;