summaryrefslogtreecommitdiffstats
path: root/fs/io-wq.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/io-wq.c')
-rw-r--r--fs/io-wq.c71
1 files changed, 60 insertions, 11 deletions
diff --git a/fs/io-wq.c b/fs/io-wq.c
index c51691262208..5c4f582d6549 100644
--- a/fs/io-wq.c
+++ b/fs/io-wq.c
@@ -142,6 +142,7 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
struct io_cb_cancel_data *match);
static void create_worker_cb(struct callback_head *cb);
+static void io_wq_cancel_tw_create(struct io_wq *wq);
static bool io_worker_get(struct io_worker *worker)
{
@@ -357,12 +358,22 @@ static bool io_queue_worker_create(struct io_worker *worker,
test_and_set_bit_lock(0, &worker->create_state))
goto fail_release;
+ atomic_inc(&wq->worker_refs);
init_task_work(&worker->create_work, func);
worker->create_index = acct->index;
if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
- clear_bit_unlock(0, &worker->create_state);
+ /*
+ * EXIT may have been set after checking it above, check after
+ * adding the task_work and remove any creation item if it is
+ * now set. wq exit does that too, but we can have added this
+ * work item after we canceled in io_wq_exit_workers().
+ */
+ if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
+ io_wq_cancel_tw_create(wq);
+ io_worker_ref_put(wq);
return true;
}
+ io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
fail_release:
io_worker_release(worker);
@@ -384,7 +395,9 @@ static void io_wqe_dec_running(struct io_worker *worker)
if (atomic_dec_and_test(&acct->nr_running) && io_acct_run_queue(acct)) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
+ raw_spin_unlock(&wqe->lock);
io_queue_worker_create(worker, acct, create_worker_cb);
+ raw_spin_lock(&wqe->lock);
}
}
@@ -423,9 +436,10 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work->flags >> IO_WQ_HASH_SHIFT;
}
-static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
+static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
{
struct io_wq *wq = wqe->wq;
+ bool ret = false;
spin_lock_irq(&wq->hash->wait.lock);
if (list_empty(&wqe->wait.entry)) {
@@ -433,9 +447,11 @@ static void io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
if (!test_bit(hash, &wq->hash->map)) {
__set_current_state(TASK_RUNNING);
list_del_init(&wqe->wait.entry);
+ ret = true;
}
}
spin_unlock_irq(&wq->hash->wait.lock);
+ return ret;
}
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
@@ -475,14 +491,21 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
}
if (stall_hash != -1U) {
+ bool unstalled;
+
/*
* Set this before dropping the lock to avoid racing with new
* work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&wqe->lock);
- io_wait_on_hash(wqe, stall_hash);
+ unstalled = io_wait_on_hash(wqe, stall_hash);
raw_spin_lock(&wqe->lock);
+ if (unstalled) {
+ clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ if (wq_has_sleeper(&wqe->wq->hash->wait))
+ wake_up(&wqe->wq->hash->wait);
+ }
}
return NULL;
@@ -564,8 +587,11 @@ get_next:
io_wqe_enqueue(wqe, linked);
if (hash != -1U && !next_hashed) {
+ /* serialize hash clear with wake_up() */
+ spin_lock_irq(&wq->hash->wait.lock);
clear_bit(hash, &wq->hash->map);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
+ spin_unlock_irq(&wq->hash->wait.lock);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
raw_spin_lock(&wqe->lock);
@@ -701,6 +727,13 @@ static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
static inline bool io_should_retry_thread(long err)
{
+ /*
+ * Prevent perpetual task_work retry, if the task (or its group) is
+ * exiting.
+ */
+ if (fatal_signal_pending(current))
+ return false;
+
switch (err) {
case -EAGAIN:
case -ERESTARTSYS:
@@ -1178,13 +1211,9 @@ void io_wq_exit_start(struct io_wq *wq)
set_bit(IO_WQ_BIT_EXIT, &wq->state);
}
-static void io_wq_exit_workers(struct io_wq *wq)
+static void io_wq_cancel_tw_create(struct io_wq *wq)
{
struct callback_head *cb;
- int node;
-
- if (!wq->task)
- return;
while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
struct io_worker *worker;
@@ -1192,6 +1221,16 @@ static void io_wq_exit_workers(struct io_wq *wq)
worker = container_of(cb, struct io_worker, create_work);
io_worker_cancel_cb(worker);
}
+}
+
+static void io_wq_exit_workers(struct io_wq *wq)
+{
+ int node;
+
+ if (!wq->task)
+ return;
+
+ io_wq_cancel_tw_create(wq);
rcu_read_lock();
for_each_node(node) {
@@ -1308,7 +1347,9 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
*/
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
- int i, node, prev = 0;
+ int prev[IO_WQ_ACCT_NR];
+ bool first_node = true;
+ int i, node;
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
@@ -1319,6 +1360,9 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
new_count[i] = task_rlimit(current, RLIMIT_NPROC);
}
+ for (i = 0; i < IO_WQ_ACCT_NR; i++)
+ prev[i] = 0;
+
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
@@ -1327,14 +1371,19 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wqe->acct[i];
- prev = max_t(int, acct->max_workers, prev);
+ if (first_node)
+ prev[i] = max_t(int, acct->max_workers, prev[i]);
if (new_count[i])
acct->max_workers = new_count[i];
- new_count[i] = prev;
}
raw_spin_unlock(&wqe->lock);
+ first_node = false;
}
rcu_read_unlock();
+
+ for (i = 0; i < IO_WQ_ACCT_NR; i++)
+ new_count[i] = prev[i];
+
return 0;
}