diff options
-rw-r--r-- | include/trace/events/rcu.h | 1 | ||||
-rw-r--r-- | kernel/rcu/rcu_segcblist.c | 12 | ||||
-rw-r--r-- | kernel/rcu/rcu_segcblist.h | 1 | ||||
-rw-r--r-- | kernel/rcu/tree.c | 116 | ||||
-rw-r--r-- | kernel/rcu/tree.h | 14 | ||||
-rw-r--r-- | kernel/rcu/tree_plugin.h | 510 |
6 files changed, 270 insertions, 384 deletions
diff --git a/include/trace/events/rcu.h b/include/trace/events/rcu.h index 313324d1b135..694bd040cf51 100644 --- a/include/trace/events/rcu.h +++ b/include/trace/events/rcu.h @@ -100,7 +100,6 @@ TRACE_EVENT_RCU(rcu_grace_period, * "Startedroot": Requested a nocb grace period based on root-node data. * "NoGPkthread": The RCU grace-period kthread has not yet started. * "StartWait": Start waiting for the requested grace period. - * "ResumeWait": Resume waiting after signal. * "EndWait": Complete wait. * "Cleanup": Clean up rcu_node structure after previous GP. * "CleanupMore": Clean up, and another GP is needed. diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c index 9ac28f175627..92968b856593 100644 --- a/kernel/rcu/rcu_segcblist.c +++ b/kernel/rcu/rcu_segcblist.c @@ -128,6 +128,18 @@ struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp) } /* + * Return false if there are no CBs awaiting grace periods, otherwise, + * return true and store the nearest waited-upon grace period into *lp. + */ +bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp) +{ + if (!rcu_segcblist_pend_cbs(rsclp)) + return false; + *lp = rsclp->gp_seq[RCU_WAIT_TAIL]; + return true; +} + +/* * Enqueue the specified callback onto the specified rcu_segcblist * structure, updating accounting as needed. Note that the ->len * field may be accessed locklessly, hence the WRITE_ONCE(). diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h index ed3fcece39a9..db38f0a512c4 100644 --- a/kernel/rcu/rcu_segcblist.h +++ b/kernel/rcu/rcu_segcblist.h @@ -89,6 +89,7 @@ bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp); bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp); struct rcu_head *rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp); +bool rcu_segcblist_nextgp(struct rcu_segcblist *rsclp, unsigned long *lp); void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp, struct rcu_head *rhp, bool lazy); bool rcu_segcblist_entrain(struct rcu_segcblist *rsclp, diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 2917ce379b23..054418d2d960 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -1343,8 +1343,10 @@ static bool rcu_advance_cbs(struct rcu_node *rnp, struct rcu_data *rdp) */ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) { - bool ret; + bool ret = false; bool need_gp; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); raw_lockdep_assert_held_rcu_node(rnp); @@ -1354,10 +1356,12 @@ static bool __note_gp_changes(struct rcu_node *rnp, struct rcu_data *rdp) /* Handle the ends of any preceding grace periods first. */ if (rcu_seq_completed_gp(rdp->gp_seq, rnp->gp_seq) || unlikely(READ_ONCE(rdp->gpwrap))) { - ret = rcu_advance_cbs(rnp, rdp); /* Advance callbacks. */ + if (!offloaded) + ret = rcu_advance_cbs(rnp, rdp); /* Advance CBs. */ trace_rcu_grace_period(rcu_state.name, rdp->gp_seq, TPS("cpuend")); } else { - ret = rcu_accelerate_cbs(rnp, rdp); /* Recent callbacks. */ + if (!offloaded) + ret = rcu_accelerate_cbs(rnp, rdp); /* Recent CBs. */ } /* Now handle the beginnings of any new-to-this-CPU grace periods. */ @@ -1658,6 +1662,7 @@ static void rcu_gp_cleanup(void) unsigned long gp_duration; bool needgp = false; unsigned long new_gp_seq; + bool offloaded; struct rcu_data *rdp; struct rcu_node *rnp = rcu_get_root(); struct swait_queue_head *sq; @@ -1723,7 +1728,9 @@ static void rcu_gp_cleanup(void) needgp = true; } /* Advance CBs to reduce false positives below. */ - if (!rcu_accelerate_cbs(rnp, rdp) && needgp) { + offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); + if ((offloaded || !rcu_accelerate_cbs(rnp, rdp)) && needgp) { WRITE_ONCE(rcu_state.gp_flags, RCU_GP_FLAG_INIT); rcu_state.gp_req_activity = jiffies; trace_rcu_grace_period(rcu_state.name, @@ -1917,7 +1924,9 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) { unsigned long flags; unsigned long mask; - bool needwake; + bool needwake = false; + const bool offloaded = IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + rcu_segcblist_is_offloaded(&rdp->cblist); struct rcu_node *rnp; rnp = rdp->mynode; @@ -1944,7 +1953,8 @@ rcu_report_qs_rdp(int cpu, struct rcu_data *rdp) * This GP can't end until cpu checks in, so all of our * callbacks can be processed during the next GP. */ - needwake = rcu_accelerate_cbs(rnp, rdp); + if (!offloaded) + needwake = rcu_accelerate_cbs(rnp, rdp); rcu_report_qs_rnp(mask, rnp, rnp->gp_seq, flags); /* ^^^ Released rnp->lock */ @@ -2082,7 +2092,6 @@ static void rcu_do_batch(struct rcu_data *rdp) struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl); long bl, count; - WARN_ON_ONCE(rdp->cblist.offloaded); /* If no callbacks are ready, just return. */ if (!rcu_segcblist_ready_cbs(&rdp->cblist)) { trace_rcu_batch_start(rcu_state.name, @@ -2101,13 +2110,14 @@ static void rcu_do_batch(struct rcu_data *rdp) * callback counts, as rcu_barrier() needs to be conservative. */ local_irq_save(flags); + rcu_nocb_lock(rdp); WARN_ON_ONCE(cpu_is_offline(smp_processor_id())); bl = rdp->blimit; trace_rcu_batch_start(rcu_state.name, rcu_segcblist_n_lazy_cbs(&rdp->cblist), rcu_segcblist_n_cbs(&rdp->cblist), bl); rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl); - local_irq_restore(flags); + rcu_nocb_unlock_irqrestore(rdp, flags); /* Invoke callbacks. */ rhp = rcu_cblist_dequeue(&rcl); @@ -2120,12 +2130,22 @@ static void rcu_do_batch(struct rcu_data *rdp) * Note: The rcl structure counts down from zero. */ if (-rcl.len >= bl && + !rcu_segcblist_is_offloaded(&rdp->cblist) && (need_resched() || (!is_idle_task(current) && !rcu_is_callbacks_kthread()))) break; + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + WARN_ON_ONCE(in_serving_softirq()); + local_bh_enable(); + lockdep_assert_irqs_enabled(); + cond_resched_tasks_rcu_qs(); + lockdep_assert_irqs_enabled(); + local_bh_disable(); + } } local_irq_save(flags); + rcu_nocb_lock(rdp); count = -rcl.len; trace_rcu_batch_end(rcu_state.name, count, !!rcl.head, need_resched(), is_idle_task(current), rcu_is_callbacks_kthread()); @@ -2153,10 +2173,11 @@ static void rcu_do_batch(struct rcu_data *rdp) */ WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0)); - local_irq_restore(flags); + rcu_nocb_unlock_irqrestore(rdp, flags); /* Re-invoke RCU core processing if there are callbacks remaining. */ - if (rcu_segcblist_ready_cbs(&rdp->cblist)) + if (!rcu_segcblist_is_offloaded(&rdp->cblist) && + rcu_segcblist_ready_cbs(&rdp->cblist)) invoke_rcu_core(); } @@ -2312,7 +2333,8 @@ static __latent_entropy void rcu_core(void) rcu_check_gp_start_stall(rnp, rdp, rcu_jiffies_till_stall_check()); /* If there are callbacks ready, invoke them. */ - if (rcu_segcblist_ready_cbs(&rdp->cblist) && + if (!rcu_segcblist_is_offloaded(&rdp->cblist) && + rcu_segcblist_ready_cbs(&rdp->cblist) && likely(READ_ONCE(rcu_scheduler_fully_active))) rcu_do_batch(rdp); @@ -2492,10 +2514,11 @@ static void rcu_leak_callback(struct rcu_head *rhp) * is expected to specify a CPU. */ static void -__call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) +__call_rcu(struct rcu_head *head, rcu_callback_t func, bool lazy) { unsigned long flags; struct rcu_data *rdp; + bool was_alldone; /* Misaligned rcu_head! */ WARN_ON_ONCE((unsigned long)head & (sizeof(void *) - 1)); @@ -2517,29 +2540,17 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) rdp = this_cpu_ptr(&rcu_data); /* Add the callback to our list. */ - if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || - rcu_segcblist_is_offloaded(&rdp->cblist) || cpu != -1) { - int offline; - - if (cpu != -1) - rdp = per_cpu_ptr(&rcu_data, cpu); - if (likely(rdp->mynode)) { - /* Post-boot, so this should be for a no-CBs CPU. */ - offline = !__call_rcu_nocb(rdp, head, lazy, flags); - WARN_ON_ONCE(offline); - /* Offline CPU, _call_rcu() illegal, leak callback. */ - local_irq_restore(flags); - return; - } - /* - * Very early boot, before rcu_init(). Initialize if needed - * and then drop through to queue the callback. - */ - WARN_ON_ONCE(cpu != -1); + if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist))) { + // This can trigger due to call_rcu() from offline CPU: + WARN_ON_ONCE(rcu_scheduler_active != RCU_SCHEDULER_INACTIVE); WARN_ON_ONCE(!rcu_is_watching()); + // Very early boot, before rcu_init(). Initialize if needed + // and then drop through to queue the callback. if (rcu_segcblist_empty(&rdp->cblist)) rcu_segcblist_init(&rdp->cblist); } + rcu_nocb_lock(rdp); + was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); rcu_segcblist_enqueue(&rdp->cblist, head, lazy); if (__is_kfree_rcu_offset((unsigned long)func)) trace_rcu_kfree_callback(rcu_state.name, head, @@ -2552,8 +2563,13 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) rcu_segcblist_n_cbs(&rdp->cblist)); /* Go handle any RCU core processing required. */ - __call_rcu_core(rdp, head, flags); - local_irq_restore(flags); + if (IS_ENABLED(CONFIG_RCU_NOCB_CPU) && + unlikely(rcu_segcblist_is_offloaded(&rdp->cblist))) { + __call_rcu_nocb_wake(rdp, was_alldone, flags); /* unlocks */ + } else { + __call_rcu_core(rdp, head, flags); + local_irq_restore(flags); + } } /** @@ -2593,7 +2609,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func, int cpu, bool lazy) */ void call_rcu(struct rcu_head *head, rcu_callback_t func) { - __call_rcu(head, func, -1, 0); + __call_rcu(head, func, 0); } EXPORT_SYMBOL_GPL(call_rcu); @@ -2606,7 +2622,7 @@ EXPORT_SYMBOL_GPL(call_rcu); */ void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func) { - __call_rcu(head, func, -1, 1); + __call_rcu(head, func, 1); } EXPORT_SYMBOL_GPL(kfree_call_rcu); @@ -2806,6 +2822,7 @@ static void rcu_barrier_func(void *unused) rcu_barrier_trace(TPS("IRQ"), -1, rcu_state.barrier_sequence); rdp->barrier_head.func = rcu_barrier_callback; debug_rcu_head_queue(&rdp->barrier_head); + rcu_nocb_lock(rdp); if (rcu_segcblist_entrain(&rdp->cblist, &rdp->barrier_head, 0)) { atomic_inc(&rcu_state.barrier_cpu_count); } else { @@ -2813,6 +2830,7 @@ static void rcu_barrier_func(void *unused) rcu_barrier_trace(TPS("IRQNQ"), -1, rcu_state.barrier_sequence); } + rcu_nocb_unlock(rdp); } /** @@ -2867,19 +2885,7 @@ void rcu_barrier(void) if (!cpu_online(cpu) && !rcu_segcblist_is_offloaded(&rdp->cblist)) continue; - if (rcu_segcblist_is_offloaded(&rdp->cblist)) { - if (!rcu_nocb_cpu_needs_barrier(cpu)) { - rcu_barrier_trace(TPS("OfflineNoCB"), cpu, - rcu_state.barrier_sequence); - } else { - rcu_barrier_trace(TPS("OnlineNoCB"), cpu, - rcu_state.barrier_sequence); - smp_mb__before_atomic(); - atomic_inc(&rcu_state.barrier_cpu_count); - __call_rcu(&rdp->barrier_head, - rcu_barrier_callback, cpu, 0); - } - } else if (rcu_segcblist_n_cbs(&rdp->cblist)) { + if (rcu_segcblist_n_cbs(&rdp->cblist)) { rcu_barrier_trace(TPS("OnlineQ"), cpu, rcu_state.barrier_sequence); smp_call_function_single(cpu, rcu_barrier_func, NULL, 1); @@ -3169,10 +3175,7 @@ void rcutree_migrate_callbacks(int cpu) local_irq_save(flags); my_rdp = this_cpu_ptr(&rcu_data); my_rnp = my_rdp->mynode; - if (rcu_nocb_adopt_orphan_cbs(my_rdp, rdp, flags)) { - local_irq_restore(flags); - return; - } + rcu_nocb_lock(my_rdp); /* irqs already disabled. */ raw_spin_lock_rcu_node(my_rnp); /* irqs already disabled. */ /* Leverage recent GPs and set GP for new callbacks. */ needwake = rcu_advance_cbs(my_rnp, rdp) || @@ -3180,9 +3183,16 @@ void rcutree_migrate_callbacks(int cpu) rcu_segcblist_merge(&my_rdp->cblist, &rdp->cblist); WARN_ON_ONCE(rcu_segcblist_empty(&my_rdp->cblist) != !rcu_segcblist_n_cbs(&my_rdp->cblist)); - raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags); + if (rcu_segcblist_is_offloaded(&my_rdp->cblist)) { + raw_spin_unlock_rcu_node(my_rnp); /* irqs remain disabled. */ + __call_rcu_nocb_wake(my_rdp, true, flags); + } else { + rcu_nocb_unlock(my_rdp); /* irqs remain disabled. */ + raw_spin_unlock_irqrestore_rcu_node(my_rnp, flags); + } if (needwake) rcu_gp_kthread_wake(); + lockdep_assert_irqs_enabled(); WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 || !rcu_segcblist_empty(&rdp->cblist), "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n", diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index 8d9cfcac6757..529eec2aa74d 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -211,7 +211,9 @@ struct rcu_data { /* CBs waiting for GP. */ struct rcu_head **nocb_gp_tail; bool nocb_gp_sleep; /* Is the nocb GP thread asleep? */ + bool nocb_gp_forced; /* Forced nocb GP thread wakeup? */ struct swait_queue_head nocb_gp_wq; /* For nocb kthreads to sleep on. */ + bool nocb_cb_sleep; /* Is the nocb CB thread asleep? */ struct task_struct *nocb_cb_kthread; struct rcu_data *nocb_next_cb_rdp; /* Next rcu_data in wakeup chain. */ @@ -421,20 +423,20 @@ static bool rcu_preempt_has_tasks(struct rcu_node *rnp); static bool rcu_preempt_need_deferred_qs(struct task_struct *t); static void rcu_preempt_deferred_qs(struct task_struct *t); static void zero_cpu_stall_ticks(struct rcu_data *rdp); -static bool rcu_nocb_cpu_needs_barrier(int cpu); static struct swait_queue_head *rcu_nocb_gp_get(struct rcu_node *rnp); static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq); static void rcu_init_one_nocb(struct rcu_node *rnp); -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags); -static bool rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags); +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, + unsigned long flags); static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp); static void do_nocb_deferred_wakeup(struct rcu_data *rdp); static void rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp); static void rcu_spawn_cpu_nocb_kthread(int cpu); static void __init rcu_spawn_nocb_kthreads(void); +static void rcu_nocb_lock(struct rcu_data *rdp); +static void rcu_nocb_unlock(struct rcu_data *rdp); +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags); #ifdef CONFIG_RCU_NOCB_CPU static void __init rcu_organize_nocb_kthreads(void); #endif /* #ifdef CONFIG_RCU_NOCB_CPU */ diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h index 2d37fd3fa0d4..feffc46cccb0 100644 --- a/kernel/rcu/tree_plugin.h +++ b/kernel/rcu/tree_plugin.h @@ -1495,6 +1495,45 @@ static int __init parse_rcu_nocb_poll(char *arg) early_param("rcu_nocb_poll", parse_rcu_nocb_poll); /* + * Acquire the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_lock(struct rcu_data *rdp) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_lock(&rdp->nocb_lock); + } +} + +/* + * Release the specified rcu_data structure's ->nocb_lock, but only + * if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock(&rdp->nocb_lock); + } +} + +/* + * Release the specified rcu_data structure's ->nocb_lock and restore + * interrupts, but only if it corresponds to a no-CBs CPU. + */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + if (rcu_segcblist_is_offloaded(&rdp->cblist)) { + lockdep_assert_irqs_disabled(); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + } else { + local_irq_restore(flags); + } +} + +/* * Wake up any no-CBs CPUs' kthreads that were waiting on the just-ended * grace period. */ @@ -1526,7 +1565,7 @@ bool rcu_is_nocb_cpu(int cpu) * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock * and this function releases it. */ -static void __wake_nocb_gp(struct rcu_data *rdp, bool force, +static void wake_nocb_gp(struct rcu_data *rdp, bool force, unsigned long flags) __releases(rdp->nocb_lock) { @@ -1537,326 +1576,168 @@ static void __wake_nocb_gp(struct rcu_data *rdp, bool force, raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); return; } - if (rdp_gp->nocb_gp_sleep || force) { - /* Prior smp_mb__after_atomic() orders against prior enqueue. */ - WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); + if (READ_ONCE(rdp_gp->nocb_gp_sleep) || force) { del_timer(&rdp->nocb_timer); raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - smp_mb(); /* ->nocb_gp_sleep before swake_up_one(). */ - swake_up_one(&rdp_gp->nocb_gp_wq); + smp_mb(); /* enqueue before ->nocb_gp_sleep. */ + raw_spin_lock_irqsave(&rdp_gp->nocb_lock, flags); + WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); + raw_spin_unlock_irqrestore(&rdp_gp->nocb_lock, flags); + wake_up_process(rdp_gp->nocb_gp_kthread); } else { raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); } } /* - * Kick the GP kthread for this NOCB group, but caller has not - * acquired locks. - */ -static void wake_nocb_gp(struct rcu_data *rdp, bool force) -{ - unsigned long flags; - - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - __wake_nocb_gp(rdp, force, flags); -} - -/* * Arrange to wake the GP kthread for this NOCB group at some future * time when it is safe to do so. */ static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, const char *reason) { - unsigned long flags; - - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) mod_timer(&rdp->nocb_timer, jiffies + 1); WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); -} - -/* Does rcu_barrier need to queue an RCU callback on the specified CPU? */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) -{ - struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu); - unsigned long ret; -#ifdef CONFIG_PROVE_RCU - struct rcu_head *rhp; -#endif /* #ifdef CONFIG_PROVE_RCU */ - - /* - * Check count of all no-CBs callbacks awaiting invocation. - * There needs to be a barrier before this function is called, - * but associated with a prior determination that no more - * callbacks would be posted. In the worst case, the first - * barrier in rcu_barrier() suffices (but the caller cannot - * necessarily rely on this, not a substitute for the caller - * getting the concurrency design right!). There must also be a - * barrier between the following load and posting of a callback - * (if a callback is in fact needed). This is associated with an - * atomic_inc() in the caller. - */ - ret = rcu_get_n_cbs_nocb_cpu(rdp); - -#ifdef CONFIG_PROVE_RCU - rhp = READ_ONCE(rdp->nocb_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_gp_head); - if (!rhp) - rhp = READ_ONCE(rdp->nocb_cb_head); - - /* Having no rcuo kthread but CBs after scheduler starts is bad! */ - if (!READ_ONCE(rdp->nocb_cb_kthread) && rhp && - rcu_scheduler_fully_active) { - /* RCU callback enqueued before CPU first came online??? */ - pr_err("RCU: Never-onlined no-CBs CPU %d has CB %p\n", - cpu, rhp->func); - WARN_ON_ONCE(1); - } -#endif /* #ifdef CONFIG_PROVE_RCU */ - - return !!ret; } /* - * Enqueue the specified string of rcu_head structures onto the specified - * CPU's no-CBs lists. The CPU is specified by rdp, the head of the - * string by rhp, and the tail of the string by rhtp. The non-lazy/lazy - * counts are supplied by rhcount and rhcount_lazy. + * Awaken the no-CBs grace-period kthead if needed, either due to it + * legitimately being asleep or due to overload conditions. * * If warranted, also wake up the kthread servicing this CPUs queues. */ -static void __call_rcu_nocb_enqueue(struct rcu_data *rdp, - struct rcu_head *rhp, - struct rcu_head **rhtp, - int rhcount, int rhcount_lazy, - unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, + unsigned long flags) + __releases(rdp->nocb_lock) { int len; - struct rcu_head **old_rhpp; struct task_struct *t; - /* Enqueue the callback on the nocb list and update counts. */ - atomic_long_add(rhcount, &rdp->nocb_q_count); - /* rcu_barrier() relies on ->nocb_q_count add before xchg. */ - old_rhpp = xchg(&rdp->nocb_tail, rhtp); - WRITE_ONCE(*old_rhpp, rhp); - atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy); - smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */ - - /* If we are not being polled and there is a kthread, awaken it ... */ + // If we are being polled or there is no kthread, just leave. t = READ_ONCE(rdp->nocb_gp_kthread); if (rcu_nocb_poll || !t) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNotPoll")); + rcu_nocb_unlock_irqrestore(rdp, flags); return; } - len = rcu_get_n_cbs_nocb_cpu(rdp); - if (old_rhpp == &rdp->nocb_head) { + // Need to actually to a wakeup. + len = rcu_segcblist_n_cbs(&rdp->cblist); + if (was_alldone) { if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ - wake_nocb_gp(rdp, false); + wake_nocb_gp(rdp, false, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, TPS("WakeEmptyIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } rdp->qlen_last_fqs_check = 0; } else if (len > rdp->qlen_last_fqs_check + qhimark) { /* ... or if many callbacks queued. */ if (!irqs_disabled_flags(flags)) { - wake_nocb_gp(rdp, true); + wake_nocb_gp(rdp, true, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeOvf")); } else { wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, TPS("WakeOvfIsDeferred")); + rcu_nocb_unlock_irqrestore(rdp, flags); } rdp->qlen_last_fqs_check = LONG_MAX / 2; } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); + rcu_nocb_unlock_irqrestore(rdp, flags); } + if (!irqs_disabled_flags(flags)) + lockdep_assert_irqs_enabled(); return; } /* - * This is a helper for __call_rcu(), which invokes this when the normal - * callback queue is inoperable. If this is not a no-CBs CPU, this - * function returns failure back to __call_rcu(), which can complain - * appropriately. - * - * Otherwise, this function queues the callback where the corresponding - * "rcuo" kthread can find it. - */ -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) -{ - - if (!rcu_segcblist_is_offloaded(&rdp->cblist)) - return false; - __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); - if (__is_kfree_rcu_offset((unsigned long)rhp->func)) - trace_rcu_kfree_callback(rcu_state.name, rhp, - (unsigned long)rhp->func, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - else - trace_rcu_callback(rcu_state.name, rhp, - -atomic_long_read(&rdp->nocb_q_count_lazy), - -rcu_get_n_cbs_nocb_cpu(rdp)); - - return true; -} - -/* - * Adopt orphaned callbacks on a no-CBs CPU, or return 0 if this is - * not a no-CBs CPU. - */ -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) -{ - lockdep_assert_irqs_disabled(); - if (!rcu_segcblist_is_offloaded(&my_rdp->cblist)) - return false; /* Not NOCBs CPU, caller must migrate CBs. */ - __call_rcu_nocb_enqueue(my_rdp, rcu_segcblist_head(&rdp->cblist), - rcu_segcblist_tail(&rdp->cblist), - rcu_segcblist_n_cbs(&rdp->cblist), - rcu_segcblist_n_lazy_cbs(&rdp->cblist), flags); - rcu_segcblist_init(&rdp->cblist); - rcu_segcblist_disable(&rdp->cblist); - return true; -} - -/* - * If necessary, kick off a new grace period, and either way wait - * for a subsequent grace period to complete. - */ -static void rcu_nocb_wait_gp(struct rcu_data *rdp) -{ - unsigned long c; - bool d; - unsigned long flags; - bool needwake; - struct rcu_node *rnp = rdp->mynode; - - local_irq_save(flags); - c = rcu_seq_snap(&rcu_state.gp_seq); - if (!rdp->gpwrap && ULONG_CMP_GE(rdp->gp_seq_needed, c)) { - local_irq_restore(flags); - } else { - raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ - needwake = rcu_start_this_gp(rnp, rdp, c); - raw_spin_unlock_irqrestore_rcu_node(rnp, flags); - if (needwake) - rcu_gp_kthread_wake(); - } - - /* - * Wait for the grace period. Do so interruptibly to avoid messing - * up the load average. - */ - trace_rcu_this_gp(rnp, rdp, c, TPS("StartWait")); - for (;;) { - swait_event_interruptible_exclusive( - rnp->nocb_gp_wq[rcu_seq_ctr(c) & 0x1], - (d = rcu_seq_done(&rnp->gp_seq, c))); - if (likely(d)) - break; - WARN_ON(signal_pending(current)); - trace_rcu_this_gp(rnp, rdp, c, TPS("ResumeWait")); - } - trace_rcu_this_gp(rnp, rdp, c, TPS("EndWait")); - smp_mb(); /* Ensure that CB invocation happens after GP end. */ -} - -/* - * No-CBs GP kthreads come here to wait for additional callbacks to show up. - * This function does not return until callbacks appear. + * No-CBs GP kthreads come here to wait for additional callbacks to show up + * or for grace periods to end. */ static void nocb_gp_wait(struct rcu_data *my_rdp) { - bool firsttime = true; + int __maybe_unused cpu = my_rdp->cpu; + unsigned long cur_gp_seq; unsigned long flags; bool gotcbs; + bool needwait_gp = false; + bool needwake; + bool needwake_gp; struct rcu_data *rdp; - struct rcu_head **tail; - - /* Wait for callbacks to appear. */ - if (!rcu_nocb_poll) { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Sleep")); - swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, - !READ_ONCE(my_rdp->nocb_gp_sleep)); - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_gp_sleep = true; - WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - del_timer(&my_rdp->nocb_timer); - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); - } else if (firsttime) { - firsttime = false; /* Don't drown trace log with "Poll"! */ - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, TPS("Poll")); - } + struct rcu_node *rnp; + unsigned long wait_gp_seq; /* - * Each pass through the following loop checks for CBs. - * We are our own first CB kthread. Any CBs found are moved to - * nocb_gp_head, where they await a grace period. + * Each pass through the following loop checks for CBs and for the + * nearest grace period (if any) to wait for next. The CB kthreads + * and the global grace-period kthread are awakened if needed. */ - gotcbs = false; - smp_mb(); /* wakeup and _sleep before ->nocb_head reads. */ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { - rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); - if (!rdp->nocb_gp_head) - continue; /* No CBs here, try next. */ - - /* Move callbacks to wait-for-GP list, which is empty. */ - WRITE_ONCE(rdp->nocb_head, NULL); - rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); - gotcbs = true; - } - - /* No callbacks? Sleep a bit if polling, and go retry. */ - if (unlikely(!gotcbs)) { - WARN_ON(signal_pending(current)); - if (rcu_nocb_poll) { - schedule_timeout_interruptible(1); - } else { - trace_rcu_nocb_wake(rcu_state.name, my_rdp->cpu, - TPS("WokeEmpty")); + if (rcu_segcblist_empty(&rdp->cblist)) + continue; /* No callbacks here, try next. */ + rnp = rdp->mynode; + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); + del_timer(&my_rdp->nocb_timer); + raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ + needwake_gp = rcu_advance_cbs(rnp, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ + // Need to wait on some grace period? + if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) { + if (!needwait_gp || + ULONG_CMP_LT(cur_gp_seq, wait_gp_seq)) + wait_gp_seq = cur_gp_seq; + needwait_gp = true; } - return; - } - - /* Wait for one grace period. */ - rcu_nocb_wait_gp(my_rdp); - - /* Each pass through this loop wakes a CB kthread, if needed. */ - for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { - if (!rcu_nocb_poll && - READ_ONCE(rdp->nocb_head) && - READ_ONCE(my_rdp->nocb_gp_sleep)) { - raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); - my_rdp->nocb_gp_sleep = false;/* No need to sleep.*/ - raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + needwake = rdp->nocb_cb_sleep; + WRITE_ONCE(rdp->nocb_cb_sleep, false); + smp_mb(); /* CB invocation -after- GP end. */ + } else { + needwake = false; } - if (!rdp->nocb_gp_head) - continue; /* No CBs, so no need to wake kthread. */ - - /* Append callbacks to CB kthread's "done" list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - tail = rdp->nocb_cb_tail; - rdp->nocb_cb_tail = rdp->nocb_gp_tail; - *tail = rdp->nocb_gp_head; raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (tail == &rdp->nocb_cb_head) { - /* List was empty, so wake up the kthread. */ + if (needwake) { swake_up_one(&rdp->nocb_cb_wq); + gotcbs = true; } + if (needwake_gp) + rcu_gp_kthread_wake(); + } + + if (rcu_nocb_poll) { + /* Polling, so trace if first poll in the series. */ + if (gotcbs) + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); + schedule_timeout_interruptible(1); + } else if (!needwait_gp) { + /* Wait for callbacks to appear. */ + trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); + swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, + !READ_ONCE(my_rdp->nocb_gp_sleep)); + } else { + rnp = my_rdp->mynode; + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait")); + swait_event_interruptible_exclusive( + rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1], + rcu_seq_done(&rnp->gp_seq, wait_gp_seq) || + !READ_ONCE(my_rdp->nocb_gp_sleep)); + trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); + } + if (!rcu_nocb_poll) { + raw_spin_lock_irqsave(&my_rdp->nocb_lock, flags); + WRITE_ONCE(my_rdp->nocb_gp_sleep, true); + raw_spin_unlock_irqrestore(&my_rdp->nocb_lock, flags); } + WARN_ON(signal_pending(current)); } /* @@ -1871,92 +1752,69 @@ static int rcu_nocb_gp_kthread(void *arg) { struct rcu_data *rdp = arg; - for (;;) + for (;;) { nocb_gp_wait(rdp); + cond_resched_tasks_rcu_qs(); + } return 0; } /* - * No-CBs CB kthreads come here to wait for additional callbacks to show up. - * This function returns true ("keep waiting") until callbacks appear and - * then false ("stop waiting") when callbacks finally do appear. + * Invoke any ready callbacks from the corresponding no-CBs CPU, + * then, if there are no more, wait for more to appear. */ -static bool nocb_cb_wait(struct rcu_data *rdp) +static void nocb_cb_wait(struct rcu_data *rdp) { + unsigned long flags; + bool needwake_gp = false; + struct rcu_node *rnp = rdp->mynode; + + local_irq_save(flags); + rcu_momentary_dyntick_idle(); + local_irq_restore(flags); + local_bh_disable(); + rcu_do_batch(rdp); + local_bh_enable(); + lockdep_assert_irqs_enabled(); + raw_spin_lock_irqsave(&rdp->nocb_lock, flags); + raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */ + needwake_gp = rcu_advance_cbs(rdp->mynode, rdp); + raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ + if (rcu_segcblist_ready_cbs(&rdp->cblist)) { + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); + return; + } + trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); + WRITE_ONCE(rdp->nocb_cb_sleep, true); + raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); + if (needwake_gp) + rcu_gp_kthread_wake(); swait_event_interruptible_exclusive(rdp->nocb_cb_wq, - READ_ONCE(rdp->nocb_cb_head)); - if (smp_load_acquire(&rdp->nocb_cb_head)) { /* VVV */ - /* ^^^ Ensure CB invocation follows _head test. */ - return false; + !READ_ONCE(rdp->nocb_cb_sleep)); + if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */ + /* ^^^ Ensure CB invocation follows _sleep test. */ + return; } WARN_ON(signal_pending(current)); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); - return true; } /* - * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes - * callbacks queued by the corresponding no-CBs CPU, however, there is an - * optional GP-CB relationship so that the grace-period kthreads don't - * have to do quite so many wakeups (as in they only need to wake the - * no-CBs GP kthreads, not the CB kthreads). + * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke + * nocb_cb_wait() to do the dirty work. */ static int rcu_nocb_cb_kthread(void *arg) { - int c, cl; - unsigned long flags; - struct rcu_head *list; - struct rcu_head *next; - struct rcu_head **tail; struct rcu_data *rdp = arg; - /* Each pass through this loop invokes one batch of callbacks */ + // Each pass through this loop does one callback batch, and, + // if there are no more ready callbacks, waits for them. for (;;) { - /* Wait for callbacks. */ - while (nocb_cb_wait(rdp)) - continue; - - /* Pull the ready-to-invoke callbacks onto local list. */ - raw_spin_lock_irqsave(&rdp->nocb_lock, flags); - list = rdp->nocb_cb_head; - rdp->nocb_cb_head = NULL; - tail = rdp->nocb_cb_tail; - rdp->nocb_cb_tail = &rdp->nocb_cb_head; - raw_spin_unlock_irqrestore(&rdp->nocb_lock, flags); - if (WARN_ON_ONCE(!list)) - continue; - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeNonEmpty")); - - /* Each pass through the following loop invokes a callback. */ - trace_rcu_batch_start(rcu_state.name, - atomic_long_read(&rdp->nocb_q_count_lazy), - rcu_get_n_cbs_nocb_cpu(rdp), -1); - c = cl = 0; - while (list) { - next = list->next; - /* Wait for enqueuing to complete, if needed. */ - while (next == NULL && &list->next != tail) { - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WaitQueue")); - schedule_timeout_interruptible(1); - trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, - TPS("WokeQueue")); - next = list->next; - } - debug_rcu_head_unqueue(list); - local_bh_disable(); - if (__rcu_reclaim(rcu_state.name, list)) - cl++; - c++; - local_bh_enable(); - cond_resched_tasks_rcu_qs(); - list = next; - } - trace_rcu_batch_end(rcu_state.name, c, !!list, 0, 0, 1); - smp_mb__before_atomic(); /* _add after CB invocation. */ - atomic_long_add(-c, &rdp->nocb_q_count); - atomic_long_add(-cl, &rdp->nocb_q_count_lazy); + nocb_cb_wait(rdp); + cond_resched_tasks_rcu_qs(); } return 0; } @@ -1980,7 +1838,7 @@ static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp) } ndw = READ_ONCE(rdp->nocb_defer_wakeup); WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); - __wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); + wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); } @@ -2194,10 +2052,21 @@ static unsigned long rcu_get_n_cbs_nocb_cpu(struct rcu_data *rdp) #else /* #ifdef CONFIG_RCU_NOCB_CPU */ -static bool rcu_nocb_cpu_needs_barrier(int cpu) +/* No ->nocb_lock to acquire. */ +static void rcu_nocb_lock(struct rcu_data *rdp) { - WARN_ON_ONCE(1); /* Should be dead code. */ - return false; +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock(struct rcu_data *rdp) +{ +} + +/* No ->nocb_lock to release. */ +static void rcu_nocb_unlock_irqrestore(struct rcu_data *rdp, + unsigned long flags) +{ + local_irq_restore(flags); } static void rcu_nocb_gp_cleanup(struct swait_queue_head *sq) @@ -2213,17 +2082,10 @@ static void rcu_init_one_nocb(struct rcu_node *rnp) { } -static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, - bool lazy, unsigned long flags) +static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_empty, + unsigned long flags) { - return false; -} - -static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_data *my_rdp, - struct rcu_data *rdp, - unsigned long flags) -{ - return false; + WARN_ON_ONCE(1); /* Should be dead code! */ } static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp) |