diff options
Diffstat (limited to 'fs/dlm')
-rw-r--r-- | fs/dlm/dlm_internal.h | 16 | ||||
-rw-r--r-- | fs/dlm/lock.c | 382 | ||||
-rw-r--r-- | fs/dlm/lock.h | 2 | ||||
-rw-r--r-- | fs/dlm/lockspace.c | 105 | ||||
-rw-r--r-- | fs/dlm/member.c | 2 | ||||
-rw-r--r-- | fs/dlm/recover.c | 5 | ||||
-rw-r--r-- | fs/dlm/recoverd.c | 10 |
7 files changed, 283 insertions, 239 deletions
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h index cf43b97cf3e5..98a0ac511bc8 100644 --- a/fs/dlm/dlm_internal.h +++ b/fs/dlm/dlm_internal.h @@ -334,6 +334,7 @@ struct dlm_rsb { struct list_head res_root_list; /* used for recovery */ struct list_head res_masters_list; /* used for recovery */ struct list_head res_recover_list; /* used for recovery */ + struct list_head res_toss_q_list; int res_recover_locks_count; char *res_lvbptr; @@ -584,13 +585,20 @@ struct dlm_ls { spinlock_t ls_lkbidr_spin; struct rhashtable ls_rsbtbl; -#define DLM_RTF_SHRINK_BIT 0 - unsigned long ls_rsbtbl_flags; spinlock_t ls_rsbtbl_lock; struct list_head ls_toss; struct list_head ls_keep; + struct timer_list ls_timer; + /* this queue is ordered according the + * absolute res_toss_time jiffies time + * to mod_timer() with the first element + * if necessary. + */ + struct list_head ls_toss_q; + spinlock_t ls_toss_q_lock; + spinlock_t ls_waiters_lock; struct list_head ls_waiters; /* lkbs needing a reply */ @@ -601,9 +609,6 @@ struct dlm_ls { int ls_new_rsb_count; struct list_head ls_new_rsb; /* new rsb structs */ - char *ls_remove_names[DLM_REMOVE_NAMES_MAX]; - int ls_remove_lens[DLM_REMOVE_NAMES_MAX]; - struct list_head ls_nodes; /* current nodes in ls */ struct list_head ls_nodes_gone; /* dead node list, recovery */ int ls_num_nodes; /* number of nodes in ls */ @@ -640,7 +645,6 @@ struct dlm_ls { spinlock_t ls_cb_lock; struct list_head ls_cb_delay; /* save for queue_work later */ - struct timer_list ls_timer; struct task_struct *ls_recoverd_task; struct mutex ls_recoverd_active; spinlock_t ls_recover_lock; diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c index fee1a4164fc1..7c97181a04fe 100644 --- a/fs/dlm/lock.c +++ b/fs/dlm/lock.c @@ -320,6 +320,11 @@ static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode) * Basic operations on rsb's and lkb's */ +static inline unsigned long rsb_toss_jiffies(void) +{ + return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ); +} + /* This is only called to add a reference when the code already holds a valid reference to the rsb, so there's no need for locking. */ @@ -416,6 +421,229 @@ static int pre_rsb_struct(struct dlm_ls *ls) return 0; } +/* connected with timer_delete_sync() in dlm_ls_stop() to stop + * new timers when recovery is triggered and don't run them + * again until a dlm_timer_resume() tries it again. + */ +static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies) +{ + if (!dlm_locking_stopped(ls)) + mod_timer(&ls->ls_timer, jiffies); +} + +/* This function tries to resume the timer callback if a rsb + * is on the toss list and no timer is pending. It might that + * the first entry is on currently executed as timer callback + * but we don't care if a timer queued up again and does + * nothing. Should be a rare case. + */ +void dlm_timer_resume(struct dlm_ls *ls) +{ + struct dlm_rsb *r; + + spin_lock_bh(&ls->ls_toss_q_lock); + r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, + res_toss_q_list); + if (r && !timer_pending(&ls->ls_timer)) + __rsb_mod_timer(ls, r->res_toss_time); + spin_unlock_bh(&ls->ls_toss_q_lock); +} + +/* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */ +static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r) +{ + struct dlm_rsb *first; + + spin_lock_bh(&ls->ls_toss_q_lock); + r->res_toss_time = 0; + + /* if the rsb is not queued do nothing */ + if (list_empty(&r->res_toss_q_list)) + goto out; + + /* get the first element before delete */ + first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb, + res_toss_q_list); + list_del_init(&r->res_toss_q_list); + /* check if the first element was the rsb we deleted */ + if (first == r) { + /* try to get the new first element, if the list + * is empty now try to delete the timer, if we are + * too late we don't care. + * + * if the list isn't empty and a new first element got + * in place, set the new timer expire time. + */ + first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, + res_toss_q_list); + if (!first) + timer_delete(&ls->ls_timer); + else + __rsb_mod_timer(ls, first->res_toss_time); + } + +out: + spin_unlock_bh(&ls->ls_toss_q_lock); +} + +/* Caller must held ls_rsbtbl_lock and need to be called every time + * when either the rsb enters toss state or the toss state changes + * the dir/master nodeid. + */ +static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r) +{ + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *first; + + /* If we're the directory record for this rsb, and + * we're not the master of it, then we need to wait + * for the master node to send us a dir remove for + * before removing the dir record. + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)) { + rsb_delete_toss_timer(ls, r); + return; + } + + spin_lock_bh(&ls->ls_toss_q_lock); + /* set the new rsb absolute expire time in the rsb */ + r->res_toss_time = rsb_toss_jiffies(); + if (list_empty(&ls->ls_toss_q)) { + /* if the queue is empty add the element and it's + * our new expire time + */ + list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + __rsb_mod_timer(ls, r->res_toss_time); + } else { + /* check if the rsb was already queued, if so delete + * it from the toss queue + */ + if (!list_empty(&r->res_toss_q_list)) + list_del(&r->res_toss_q_list); + + /* try to get the maybe new first element and then add + * to this rsb with the oldest expire time to the end + * of the queue. If the list was empty before this + * rsb expire time is our next expiration if it wasn't + * the now new first elemet is our new expiration time + */ + first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, + res_toss_q_list); + list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q); + if (!first) + __rsb_mod_timer(ls, r->res_toss_time); + else + __rsb_mod_timer(ls, first->res_toss_time); + } + spin_unlock_bh(&ls->ls_toss_q_lock); +} + +/* if we hit contention we do in 250 ms a retry to trylock. + * if there is any other mod_timer in between we don't care + * about that it expires earlier again this is only for the + * unlikely case nothing happened in this time. + */ +#define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250)) + +void dlm_rsb_toss_timer(struct timer_list *timer) +{ + struct dlm_ls *ls = from_timer(ls, timer, ls_timer); + int our_nodeid = dlm_our_nodeid(); + struct dlm_rsb *r; + int rv; + + while (1) { + /* interrupting point to leave iteration when + * recovery waits for timer_delete_sync(), recovery + * will take care to delete everything in toss queue. + */ + if (dlm_locking_stopped(ls)) + break; + + rv = spin_trylock(&ls->ls_toss_q_lock); + if (!rv) { + /* rearm again try timer */ + __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb, + res_toss_q_list); + if (!r) { + /* nothing to do anymore next rsb queue will + * set next mod_timer() expire. + */ + spin_unlock(&ls->ls_toss_q_lock); + break; + } + + /* test if the first rsb isn't expired yet, if + * so we stop freeing rsb from toss queue as + * the order in queue is ascending to the + * absolute res_toss_time jiffies + */ + if (time_before(jiffies, r->res_toss_time)) { + /* rearm with the next rsb to expire in the future */ + __rsb_mod_timer(ls, r->res_toss_time); + spin_unlock(&ls->ls_toss_q_lock); + break; + } + + /* in find_rsb_dir/nodir there is a reverse order of this + * lock, however this is only a trylock if we hit some + * possible contention we try it again. + * + * This lock synchronized while holding ls_toss_q_lock + * synchronize everything that rsb_delete_toss_timer() + * or rsb_mod_timer() can't run after this timer callback + * deletes the rsb from the ls_toss_q. Whereas the other + * holders have always a priority to run as this is only + * a caching handling and the other holders might to put + * this rsb out of the toss state. + */ + rv = spin_trylock(&ls->ls_rsbtbl_lock); + if (!rv) { + spin_unlock(&ls->ls_toss_q_lock); + /* rearm again try timer */ + __rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY); + break; + } + + list_del(&r->res_rsbs_list); + rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, + dlm_rhash_rsb_params); + + /* not necessary to held the ls_rsbtbl_lock when + * calling send_remove() + */ + spin_unlock(&ls->ls_rsbtbl_lock); + + /* remove the rsb out of the toss queue its gone + * drom DLM now + */ + list_del_init(&r->res_toss_q_list); + spin_unlock(&ls->ls_toss_q_lock); + + /* no rsb in this state should ever run a timer */ + WARN_ON(!dlm_no_directory(ls) && + (r->res_master_nodeid != our_nodeid) && + (dlm_dir_nodeid(r) == our_nodeid)); + + /* We're the master of this rsb but we're not + * the directory record, so we need to tell the + * dir node to remove the dir record + */ + if (!dlm_no_directory(ls) && + (r->res_master_nodeid == our_nodeid) && + (dlm_dir_nodeid(r) != our_nodeid)) + send_remove(r); + + free_toss_rsb(r); + } +} + /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can unlock any spinlocks, go back and call pre_rsb_struct again. Otherwise, take an rsb off the list and return it. */ @@ -451,6 +679,7 @@ static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len, INIT_LIST_HEAD(&r->res_convertqueue); INIT_LIST_HEAD(&r->res_waitqueue); INIT_LIST_HEAD(&r->res_root_list); + INIT_LIST_HEAD(&r->res_toss_q_list); INIT_LIST_HEAD(&r->res_recover_list); INIT_LIST_HEAD(&r->res_masters_list); @@ -638,6 +867,9 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len, * valid for keep state rsbs */ kref_init(&r->res_ref); + rsb_delete_toss_timer(ls, r); + spin_unlock_bh(&ls->ls_rsbtbl_lock); + goto out_unlock; @@ -777,6 +1009,9 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len, * valid for keep state rsbs */ kref_init(&r->res_ref); + rsb_delete_toss_timer(ls, r); + spin_unlock_bh(&ls->ls_rsbtbl_lock); + goto out_unlock; @@ -1050,7 +1285,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags, r_nodeid, result); - r->res_toss_time = jiffies; + rsb_mod_timer(ls, r); /* the rsb was inactive (on toss list) */ spin_unlock_bh(&ls->ls_rsbtbl_lock); @@ -1070,7 +1305,6 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, r->res_master_nodeid = from_nodeid; r->res_nodeid = from_nodeid; kref_init(&r->res_ref); - r->res_toss_time = jiffies; rsb_set_flag(r, RSB_TOSS); error = rsb_insert(r, &ls->ls_rsbtbl); @@ -1082,6 +1316,7 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, } list_add(&r->res_rsbs_list, &ls->ls_toss); + rsb_mod_timer(ls, r); if (result) *result = DLM_LU_ADD; @@ -1126,8 +1361,8 @@ static void toss_rsb(struct kref *kref) DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r);); rsb_set_flag(r, RSB_TOSS); list_move(&r->res_rsbs_list, &ls->ls_toss); - r->res_toss_time = jiffies; - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags); + rsb_mod_timer(ls, r); + if (r->res_lvbptr) { dlm_free_lvb(r->res_lvbptr); r->res_lvbptr = NULL; @@ -1150,15 +1385,12 @@ void free_toss_rsb(struct dlm_rsb *r) { WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS)); - /* check if all work is done after the rsb is on toss list - * and it can be freed. - */ - DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r);); + DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r);); DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r);); @@ -1572,140 +1804,6 @@ static int remove_from_waiters_ms(struct dlm_lkb *lkb, return error; } -static void shrink_bucket(struct dlm_ls *ls) -{ - struct dlm_rsb *r, *safe; - char *name; - int our_nodeid = dlm_our_nodeid(); - int remote_count = 0; - int need_shrink = 0; - int i, len, rv; - - memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX); - - spin_lock_bh(&ls->ls_rsbtbl_lock); - - if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags)) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); - return; - } - - list_for_each_entry_safe(r, safe, &ls->ls_toss, res_rsbs_list) { - /* If we're the directory record for this rsb, and - we're not the master of it, then we need to wait - for the master node to send us a dir remove for - before removing the dir record. */ - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid != our_nodeid) && - (dlm_dir_nodeid(r) == our_nodeid)) { - continue; - } - - need_shrink = 1; - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - continue; - } - - if (!dlm_no_directory(ls) && - (r->res_master_nodeid == our_nodeid) && - (dlm_dir_nodeid(r) != our_nodeid)) { - - /* We're the master of this rsb but we're not - the directory record, so we need to tell the - dir node to remove the dir record. */ - - ls->ls_remove_lens[remote_count] = r->res_length; - memcpy(ls->ls_remove_names[remote_count], r->res_name, - DLM_RESNAME_MAXLEN); - remote_count++; - - if (remote_count >= DLM_REMOVE_NAMES_MAX) - break; - continue; - } - - list_del(&r->res_rsbs_list); - rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, - dlm_rhash_rsb_params); - free_toss_rsb(r); - } - - if (need_shrink) - set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags); - else - clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl_flags); - spin_unlock_bh(&ls->ls_rsbtbl_lock); - - /* - * While searching for rsb's to free, we found some that require - * remote removal. We leave them in place and find them again here - * so there is a very small gap between removing them from the toss - * list and sending the removal. Keeping this gap small is - * important to keep us (the master node) from being out of sync - * with the remote dir node for very long. - */ - - for (i = 0; i < remote_count; i++) { - name = ls->ls_remove_names[i]; - len = ls->ls_remove_lens[i]; - - spin_lock_bh(&ls->ls_rsbtbl_lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r); - if (rv) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); - log_error(ls, "remove_name not found %s", name); - continue; - } - - if (!rsb_flag(r, RSB_TOSS)) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); - log_debug(ls, "remove_name not toss %s", name); - continue; - } - - if (r->res_master_nodeid != our_nodeid) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); - log_debug(ls, "remove_name master %d dir %d our %d %s", - r->res_master_nodeid, r->res_dir_nodeid, - our_nodeid, name); - continue; - } - - if (r->res_dir_nodeid == our_nodeid) { - /* should never happen */ - spin_unlock_bh(&ls->ls_rsbtbl_lock); - log_error(ls, "remove_name dir %d master %d our %d %s", - r->res_dir_nodeid, r->res_master_nodeid, - our_nodeid, name); - continue; - } - - if (!time_after_eq(jiffies, r->res_toss_time + - dlm_config.ci_toss_secs * HZ)) { - spin_unlock_bh(&ls->ls_rsbtbl_lock); - log_debug(ls, "remove_name toss_time %lu now %lu %s", - r->res_toss_time, jiffies, name); - continue; - } - - list_del(&r->res_rsbs_list); - rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, - dlm_rhash_rsb_params); - send_remove(r); - spin_unlock_bh(&ls->ls_rsbtbl_lock); - - free_toss_rsb(r); - } -} - -void dlm_scan_rsbs(struct dlm_ls *ls) -{ - shrink_bucket(ls); -} - /* lkb is master or local copy */ static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) diff --git a/fs/dlm/lock.h b/fs/dlm/lock.h index b56a34802762..8de9dee4c058 100644 --- a/fs/dlm/lock.h +++ b/fs/dlm/lock.h @@ -11,6 +11,7 @@ #ifndef __LOCK_DOT_H__ #define __LOCK_DOT_H__ +void dlm_rsb_toss_timer(struct timer_list *timer); void dlm_dump_rsb(struct dlm_rsb *r); void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len); void dlm_print_lkb(struct dlm_lkb *lkb); @@ -26,6 +27,7 @@ void dlm_scan_rsbs(struct dlm_ls *ls); int dlm_lock_recovery_try(struct dlm_ls *ls); void dlm_lock_recovery(struct dlm_ls *ls); void dlm_unlock_recovery(struct dlm_ls *ls); +void dlm_timer_resume(struct dlm_ls *ls); int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name, int len, unsigned int flags, int *r_nodeid, int *result); diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 890e1a4cf787..931eb3f22ec6 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -29,8 +29,6 @@ static int ls_count; static struct mutex ls_lock; static struct list_head lslist; static spinlock_t lslist_lock; -static struct task_struct * scand_task; - static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { @@ -247,64 +245,6 @@ void dlm_lockspace_exit(void) kset_unregister(dlm_kset); } -static struct dlm_ls *find_ls_to_scan(void) -{ - struct dlm_ls *ls; - - spin_lock_bh(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (time_after_eq(jiffies, ls->ls_scan_time + - dlm_config.ci_scan_secs * HZ)) { - atomic_inc(&ls->ls_count); - spin_unlock_bh(&lslist_lock); - return ls; - } - } - spin_unlock_bh(&lslist_lock); - return NULL; -} - -static int dlm_scand(void *data) -{ - struct dlm_ls *ls; - - while (!kthread_should_stop()) { - ls = find_ls_to_scan(); - if (ls) { - if (dlm_lock_recovery_try(ls)) { - ls->ls_scan_time = jiffies; - dlm_scan_rsbs(ls); - dlm_unlock_recovery(ls); - } else { - ls->ls_scan_time += HZ; - } - - dlm_put_lockspace(ls); - continue; - } - schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); - } - return 0; -} - -static int dlm_scand_start(void) -{ - struct task_struct *p; - int error = 0; - - p = kthread_run(dlm_scand, NULL, "dlm_scand"); - if (IS_ERR(p)) - error = PTR_ERR(p); - else - scand_task = p; - return error; -} - -static void dlm_scand_stop(void) -{ - kthread_stop(scand_task); -} - struct dlm_ls *dlm_find_lockspace_global(uint32_t id) { struct dlm_ls *ls; @@ -385,22 +325,9 @@ static int threads_start(void) /* Thread for sending/receiving messages for all lockspace's */ error = dlm_midcomms_start(); - if (error) { + if (error) log_print("cannot start dlm midcomms %d", error); - goto fail; - } - error = dlm_scand_start(); - if (error) { - log_print("cannot start dlm_scand thread %d", error); - goto midcomms_fail; - } - - return 0; - - midcomms_fail: - dlm_midcomms_stop(); - fail: return error; } @@ -412,7 +339,7 @@ static int new_lockspace(const char *name, const char *cluster, struct dlm_ls *ls; int do_unreg = 0; int namelen = strlen(name); - int i, error; + int error; if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) return -EINVAL; @@ -503,13 +430,6 @@ static int new_lockspace(const char *name, const char *cluster, if (error) goto out_lsfree; - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { - ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, - GFP_KERNEL); - if (!ls->ls_remove_names[i]) - goto out_rsbtbl; - } - idr_init(&ls->ls_lkbidr); spin_lock_init(&ls->ls_lkbidr_spin); @@ -582,6 +502,11 @@ static int new_lockspace(const char *name, const char *cluster, INIT_LIST_HEAD(&ls->ls_dir_dump_list); rwlock_init(&ls->ls_dir_dump_lock); + INIT_LIST_HEAD(&ls->ls_toss_q); + spin_lock_init(&ls->ls_toss_q_lock); + timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, + TIMER_DEFERRABLE); + spin_lock_bh(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); @@ -661,9 +586,6 @@ static int new_lockspace(const char *name, const char *cluster, kfree(ls->ls_recover_buf); out_lkbidr: idr_destroy(&ls->ls_lkbidr); - out_rsbtbl: - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); rhashtable_destroy(&ls->ls_rsbtbl); out_lsfree: if (do_unreg) @@ -696,7 +618,6 @@ static int __dlm_new_lockspace(const char *name, const char *cluster, if (error > 0) error = 0; if (!ls_count) { - dlm_scand_stop(); dlm_midcomms_shutdown(); dlm_midcomms_stop(); } @@ -777,7 +698,7 @@ static void rhash_free_rsb(void *ptr, void *arg) static int release_lockspace(struct dlm_ls *ls, int force) { struct dlm_rsb *rsb; - int i, busy, rv; + int busy, rv; busy = lockspace_busy(ls, force); @@ -812,8 +733,13 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_recoverd_stop(ls); + /* clear the LSFL_RUNNING flag to fast up + * time_shutdown_sync(), we don't care anymore + */ + clear_bit(LSFL_RUNNING, &ls->ls_flags); + timer_shutdown_sync(&ls->ls_timer); + if (ls_count == 1) { - dlm_scand_stop(); dlm_clear_members(ls); dlm_midcomms_shutdown(); } @@ -839,9 +765,6 @@ static int release_lockspace(struct dlm_ls *ls, int force) */ rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); - while (!list_empty(&ls->ls_new_rsb)) { rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain); diff --git a/fs/dlm/member.c b/fs/dlm/member.c index 6401916a97ef..c46e306f2e5c 100644 --- a/fs/dlm/member.c +++ b/fs/dlm/member.c @@ -641,6 +641,8 @@ int dlm_ls_stop(struct dlm_ls *ls) spin_lock_bh(&ls->ls_recover_lock); set_bit(LSFL_RECOVER_STOP, &ls->ls_flags); new = test_and_clear_bit(LSFL_RUNNING, &ls->ls_flags); + if (new) + timer_delete_sync(&ls->ls_timer); ls->ls_recover_seq++; /* activate requestqueue and stop processing */ diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c index d43189532b14..960a14b95605 100644 --- a/fs/dlm/recover.c +++ b/fs/dlm/recover.c @@ -889,6 +889,11 @@ void dlm_clear_toss(struct dlm_ls *ls) list_del(&r->res_rsbs_list); rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node, dlm_rhash_rsb_params); + + /* remove it from the toss queue if its part of it */ + if (!list_empty(&r->res_toss_q_list)) + list_del_init(&r->res_toss_q_list); + free_toss_rsb(r); count++; } diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c index 5e8e10030b74..c831e0275912 100644 --- a/fs/dlm/recoverd.c +++ b/fs/dlm/recoverd.c @@ -98,6 +98,16 @@ static int enable_locking(struct dlm_ls *ls, uint64_t seq) spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) { set_bit(LSFL_RUNNING, &ls->ls_flags); + /* Schedule next timer if recovery put something on toss. + * + * The rsbs that was queued while recovery on toss hasn't + * started yet because LSFL_RUNNING was set everything + * else recovery hasn't started as well because ls_in_recovery + * is still hold. So we should not run into the case that + * dlm_timer_resume() queues a timer that can occur in + * a no op. + */ + dlm_timer_resume(ls); /* unblocks processes waiting to enter the dlm */ up_write(&ls->ls_in_recovery); clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags); |