summaryrefslogtreecommitdiffstats
path: root/kernel/rcu/srcu.c
diff options
context:
space:
mode:
authorPaul E. McKenney <paulmck@linux.vnet.ibm.com>2017-03-25 17:23:44 -0700
committerPaul E. McKenney <paulmck@linux.vnet.ibm.com>2017-04-18 11:38:23 -0700
commitdad81a2026841b5e2651aab58a7398c13cc05847 (patch)
treef666432234764e97893d37155429223e2c44bd8c /kernel/rcu/srcu.c
parent32071141b2448458479932fe726ce892cbe1b4e4 (diff)
downloadlinux-stable-dad81a2026841b5e2651aab58a7398c13cc05847.tar.gz
linux-stable-dad81a2026841b5e2651aab58a7398c13cc05847.tar.bz2
linux-stable-dad81a2026841b5e2651aab58a7398c13cc05847.zip
srcu: Introduce CLASSIC_SRCU Kconfig option
The TREE_SRCU rewrite is large and a bit on the non-simple side, so this commit helps reduce risk by allowing the old v4.11 SRCU algorithm to be selected using a new CLASSIC_SRCU Kconfig option that depends on RCU_EXPERT. The default is to use the new TREE_SRCU and TINY_SRCU algorithms, in order to help get these the testing that they need. However, if your users do not require the update-side scalability that is to be provided by TREE_SRCU, select RCU_EXPERT and then CLASSIC_SRCU to revert back to the old classic SRCU algorithm. Signed-off-by: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Diffstat (limited to 'kernel/rcu/srcu.c')
-rw-r--r--kernel/rcu/srcu.c347
1 files changed, 196 insertions, 151 deletions
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 3cfcc59bddf3..584d8a983883 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -36,16 +36,75 @@
#include <linux/delay.h>
#include <linux/srcu.h>
-#include <linux/rcu_node_tree.h>
#include "rcu.h"
+/*
+ * Initialize an rcu_batch structure to empty.
+ */
+static inline void rcu_batch_init(struct rcu_batch *b)
+{
+ b->head = NULL;
+ b->tail = &b->head;
+}
+
+/*
+ * Enqueue a callback onto the tail of the specified rcu_batch structure.
+ */
+static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
+{
+ *b->tail = head;
+ b->tail = &head->next;
+}
+
+/*
+ * Is the specified rcu_batch structure empty?
+ */
+static inline bool rcu_batch_empty(struct rcu_batch *b)
+{
+ return b->tail == &b->head;
+}
+
+/*
+ * Remove the callback at the head of the specified rcu_batch structure
+ * and return a pointer to it, or return NULL if the structure is empty.
+ */
+static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
+{
+ struct rcu_head *head;
+
+ if (rcu_batch_empty(b))
+ return NULL;
+
+ head = b->head;
+ b->head = head->next;
+ if (b->tail == &head->next)
+ rcu_batch_init(b);
+
+ return head;
+}
+
+/*
+ * Move all callbacks from the rcu_batch structure specified by "from" to
+ * the structure specified by "to".
+ */
+static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
+{
+ if (!rcu_batch_empty(from)) {
+ *to->tail = from->head;
+ to->tail = from->tail;
+ rcu_batch_init(from);
+ }
+}
+
static int init_srcu_struct_fields(struct srcu_struct *sp)
{
sp->completed = 0;
- sp->srcu_gp_seq = 0;
- atomic_set(&sp->srcu_exp_cnt, 0);
spin_lock_init(&sp->queue_lock);
- rcu_segcblist_init(&sp->srcu_cblist);
+ sp->running = false;
+ rcu_batch_init(&sp->batch_queue);
+ rcu_batch_init(&sp->batch_check0);
+ rcu_batch_init(&sp->batch_check1);
+ rcu_batch_init(&sp->batch_done);
INIT_DELAYED_WORK(&sp->work, process_srcu);
sp->per_cpu_ref = alloc_percpu(struct srcu_array);
return sp->per_cpu_ref ? 0 : -ENOMEM;
@@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp)
return sum;
}
-#define SRCU_INTERVAL 1
-
/**
* cleanup_srcu_struct - deconstruct a sleep-RCU structure
* @sp: structure to clean up.
@@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp)
*/
void cleanup_srcu_struct(struct srcu_struct *sp)
{
- WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
if (WARN_ON(srcu_readers_active(sp)))
return; /* Leakage unless caller handles error. */
- if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
- return; /* Leakage unless caller handles error. */
- flush_delayed_work(&sp->work);
- if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
- pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
- return; /* Caller forgot to stop doing call_srcu()? */
- }
free_percpu(sp->per_cpu_ref);
sp->per_cpu_ref = NULL;
}
@@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
* We use an adaptive strategy for synchronize_srcu() and especially for
* synchronize_srcu_expedited(). We spin for a fixed time period
* (defined below) to allow SRCU readers to exit their read-side critical
- * sections. If there are still some readers after a few microseconds,
- * we repeatedly block for 1-millisecond time periods.
+ * sections. If there are still some readers after 10 microseconds,
+ * we repeatedly block for 1-millisecond time periods. This approach
+ * has done well in testing, so there is no need for a config parameter.
*/
#define SRCU_RETRY_CHECK_DELAY 5
+#define SYNCHRONIZE_SRCU_TRYCOUNT 2
+#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT 12
/*
- * Start an SRCU grace period.
- */
-static void srcu_gp_start(struct srcu_struct *sp)
-{
- int state;
-
- rcu_segcblist_accelerate(&sp->srcu_cblist,
- rcu_seq_snap(&sp->srcu_gp_seq));
- rcu_seq_start(&sp->srcu_gp_seq);
- state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
-}
-
-/*
- * Wait until all readers counted by array index idx complete, but
- * loop an additional time if there is an expedited grace period pending.
- * The caller must ensure that ->completed is not changed while checking.
+ * @@@ Wait until all pre-existing readers complete. Such readers
+ * will have used the index specified by "idx".
+ * the caller should ensures the ->completed is not changed while checking
+ * and idx = (->completed & 1) ^ 1
*/
static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
{
for (;;) {
if (srcu_readers_active_idx_check(sp, idx))
return true;
- if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+ if (--trycount <= 0)
return false;
udelay(SRCU_RETRY_CHECK_DELAY);
}
@@ -300,19 +339,6 @@ static void srcu_flip(struct srcu_struct *sp)
}
/*
- * End an SRCU grace period.
- */
-static void srcu_gp_end(struct srcu_struct *sp)
-{
- rcu_seq_end(&sp->srcu_gp_seq);
-
- spin_lock_irq(&sp->queue_lock);
- rcu_segcblist_advance(&sp->srcu_cblist,
- rcu_seq_current(&sp->srcu_gp_seq));
- spin_unlock_irq(&sp->queue_lock);
-}
-
-/*
* Enqueue an SRCU callback on the specified srcu_struct structure,
* initiating grace-period processing if it is not already running.
*
@@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
head->func = func;
spin_lock_irqsave(&sp->queue_lock, flags);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
- srcu_gp_start(sp);
+ rcu_batch_queue(&sp->batch_queue, head);
+ if (!sp->running) {
+ sp->running = true;
queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
}
spin_unlock_irqrestore(&sp->queue_lock, flags);
}
EXPORT_SYMBOL_GPL(call_srcu);
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
+static void srcu_reschedule(struct srcu_struct *sp);
/*
* Helper function for synchronize_srcu() and synchronize_srcu_expedited().
*/
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
{
struct rcu_synchronize rcu;
struct rcu_head *head = &rcu.head;
+ bool done = false;
RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
lock_is_held(&rcu_bh_lock_map) ||
@@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp)
lock_is_held(&rcu_sched_lock_map),
"Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
- if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
- return;
might_sleep();
init_completion(&rcu.completion);
@@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp)
head->func = wakeme_after_rcu;
spin_lock_irq(&sp->queue_lock);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+ if (!sp->running) {
/* steal the processing owner */
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
- srcu_gp_start(sp);
+ sp->running = true;
+ rcu_batch_queue(&sp->batch_check0, head);
spin_unlock_irq(&sp->queue_lock);
+
+ srcu_advance_batches(sp, trycount);
+ if (!rcu_batch_empty(&sp->batch_done)) {
+ BUG_ON(sp->batch_done.head != head);
+ rcu_batch_dequeue(&sp->batch_done);
+ done = true;
+ }
/* give the processing owner to work_struct */
- srcu_reschedule(sp, 0);
+ srcu_reschedule(sp);
} else {
- rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+ rcu_batch_queue(&sp->batch_queue, head);
spin_unlock_irq(&sp->queue_lock);
}
- wait_for_completion(&rcu.completion);
- smp_mb(); /* Caller's later accesses after GP. */
-}
-
-/**
- * synchronize_srcu_expedited - Brute-force SRCU grace period
- * @sp: srcu_struct with which to synchronize.
- *
- * Wait for an SRCU grace period to elapse, but be more aggressive about
- * spinning rather than blocking when waiting.
- *
- * Note that synchronize_srcu_expedited() has the same deadlock and
- * memory-ordering properties as does synchronize_srcu().
- */
-void synchronize_srcu_expedited(struct srcu_struct *sp)
-{
- bool do_norm = rcu_gp_is_normal();
-
- if (!do_norm) {
- atomic_inc(&sp->srcu_exp_cnt);
- smp_mb__after_atomic(); /* increment before GP. */
- }
- __synchronize_srcu(sp);
- if (!do_norm) {
- smp_mb__before_atomic(); /* GP before decrement. */
- atomic_dec(&sp->srcu_exp_cnt);
+ if (!done) {
+ wait_for_completion(&rcu.completion);
+ smp_mb(); /* Caller's later accesses after GP. */
}
+
}
-EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
/**
* synchronize_srcu - wait for prior SRCU read-side critical-section completion
@@ -465,14 +475,29 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
*/
void synchronize_srcu(struct srcu_struct *sp)
{
- if (rcu_gp_is_expedited())
- synchronize_srcu_expedited(sp);
- else
- __synchronize_srcu(sp);
+ __synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal())
+ ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
+ : SYNCHRONIZE_SRCU_TRYCOUNT);
}
EXPORT_SYMBOL_GPL(synchronize_srcu);
/**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+ __synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
* srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
* @sp: srcu_struct on which to wait for in-flight callbacks.
*/
@@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp)
}
EXPORT_SYMBOL_GPL(srcu_batches_completed);
+#define SRCU_CALLBACK_BATCH 10
+#define SRCU_INTERVAL 1
+
+/*
+ * Move any new SRCU callbacks to the first stage of the SRCU grace
+ * period pipeline.
+ */
+static void srcu_collect_new(struct srcu_struct *sp)
+{
+ if (!rcu_batch_empty(&sp->batch_queue)) {
+ spin_lock_irq(&sp->queue_lock);
+ rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
+ spin_unlock_irq(&sp->queue_lock);
+ }
+}
+
/*
* Core SRCU state machine. Advance callbacks from ->batch_check0 to
* ->batch_check1 and then to ->batch_done as readers drain.
*/
-static void srcu_advance_batches(struct srcu_struct *sp)
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
{
- int idx;
+ int idx = 1 ^ (sp->completed & 1);
/*
* Because readers might be delayed for an extended period after
@@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp)
* might well be readers using both idx=0 and idx=1. We therefore
* need to wait for readers to clear from both index values before
* invoking a callback.
- *
- * The load-acquire ensures that we see the accesses performed
- * by the prior grace period.
*/
- idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
- if (idx == SRCU_STATE_IDLE) {
- spin_lock_irq(&sp->queue_lock);
- if (rcu_segcblist_empty(&sp->srcu_cblist)) {
- spin_unlock_irq(&sp->queue_lock);
- return;
- }
- idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- if (idx == SRCU_STATE_IDLE)
- srcu_gp_start(sp);
- spin_unlock_irq(&sp->queue_lock);
- if (idx != SRCU_STATE_IDLE)
- return; /* Someone else started the grace period. */
- }
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
- idx = 1 ^ (sp->completed & 1);
- if (!try_check_zero(sp, idx, 1))
- return; /* readers present, retry later. */
- srcu_flip(sp);
- rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
- }
+ if (rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_check1))
+ return; /* no callbacks need to be advanced */
- if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+ if (!try_check_zero(sp, idx, trycount))
+ return; /* failed to advance, will try after SRCU_INTERVAL */
- /*
- * SRCU read-side critical sections are normally short,
- * so check at least twice in quick succession after a flip.
- */
- idx = 1 ^ (sp->completed & 1);
- if (!try_check_zero(sp, idx, 2))
- return; /* readers present, retry after later. */
- srcu_gp_end(sp);
- }
+ /*
+ * The callbacks in ->batch_check1 have already done with their
+ * first zero check and flip back when they were enqueued on
+ * ->batch_check0 in a previous invocation of srcu_advance_batches().
+ * (Presumably try_check_zero() returned false during that
+ * invocation, leaving the callbacks stranded on ->batch_check1.)
+ * They are therefore ready to invoke, so move them to ->batch_done.
+ */
+ rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+
+ if (rcu_batch_empty(&sp->batch_check0))
+ return; /* no callbacks need to be advanced */
+ srcu_flip(sp);
+
+ /*
+ * The callbacks in ->batch_check0 just finished their
+ * first check zero and flip, so move them to ->batch_check1
+ * for future checking on the other idx.
+ */
+ rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
+
+ /*
+ * SRCU read-side critical sections are normally short, so check
+ * at least twice in quick succession after a flip.
+ */
+ trycount = trycount < 2 ? 2 : trycount;
+ if (!try_check_zero(sp, idx^1, trycount))
+ return; /* failed to advance, will try after SRCU_INTERVAL */
+
+ /*
+ * The callbacks in ->batch_check1 have now waited for all
+ * pre-existing readers using both idx values. They are therefore
+ * ready to invoke, so move them to ->batch_done.
+ */
+ rcu_batch_move(&sp->batch_done, &sp->batch_check1);
}
/*
@@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp)
*/
static void srcu_invoke_callbacks(struct srcu_struct *sp)
{
- struct rcu_cblist ready_cbs;
- struct rcu_head *rhp;
+ int i;
+ struct rcu_head *head;
- spin_lock_irq(&sp->queue_lock);
- if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
- spin_unlock_irq(&sp->queue_lock);
- return;
- }
- rcu_cblist_init(&ready_cbs);
- rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
- spin_unlock_irq(&sp->queue_lock);
- rhp = rcu_cblist_dequeue(&ready_cbs);
- for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+ for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
+ head = rcu_batch_dequeue(&sp->batch_done);
+ if (!head)
+ break;
local_bh_disable();
- rhp->func(rhp);
+ head->func(head);
local_bh_enable();
}
- spin_lock_irq(&sp->queue_lock);
- rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
- spin_unlock_irq(&sp->queue_lock);
}
/*
* Finished one round of SRCU grace period. Start another if there are
* more SRCU callbacks queued, otherwise put SRCU into not-running state.
*/
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+static void srcu_reschedule(struct srcu_struct *sp)
{
bool pending = true;
- int state;
- if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+ if (rcu_batch_empty(&sp->batch_done) &&
+ rcu_batch_empty(&sp->batch_check1) &&
+ rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_queue)) {
spin_lock_irq(&sp->queue_lock);
- state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
- if (rcu_segcblist_empty(&sp->srcu_cblist) &&
- state == SRCU_STATE_IDLE)
+ if (rcu_batch_empty(&sp->batch_done) &&
+ rcu_batch_empty(&sp->batch_check1) &&
+ rcu_batch_empty(&sp->batch_check0) &&
+ rcu_batch_empty(&sp->batch_queue)) {
+ sp->running = false;
pending = false;
+ }
spin_unlock_irq(&sp->queue_lock);
}
if (pending)
- queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+ queue_delayed_work(system_power_efficient_wq,
+ &sp->work, SRCU_INTERVAL);
}
/*
@@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work)
sp = container_of(work, struct srcu_struct, work.work);
- srcu_advance_batches(sp);
+ srcu_collect_new(sp);
+ srcu_advance_batches(sp, 1);
srcu_invoke_callbacks(sp);
- srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+ srcu_reschedule(sp);
}
EXPORT_SYMBOL_GPL(process_srcu);