From aa7ab1e20882b04fc3e45da77a9dad5cbbefba99 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:48 -0400 Subject: fs: dlm: synchronize dlm before shutdown This patch moves the dlm workqueue dlm synchronization before shutdown handling. The patch just flushes all pending work before starting to shutdown the connection. At least for the send_workqeue we should flush the workqueue to make sure there is no new connection handling going on as dlm_allow_conn switch is turned to false before. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 5050fe05769b..ed098870ba0d 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1624,10 +1624,6 @@ static void work_flush(void) struct hlist_node *n; struct connection *con; - if (recv_workqueue) - flush_workqueue(recv_workqueue); - if (send_workqueue) - flush_workqueue(send_workqueue); do { ok = 1; foreach_conn(stop_conn); @@ -1659,6 +1655,12 @@ void dlm_lowcomms_stop(void) mutex_lock(&connections_lock); dlm_allow_conn = 0; mutex_unlock(&connections_lock); + + if (recv_workqueue) + flush_workqueue(recv_workqueue); + if (send_workqueue) + flush_workqueue(send_workqueue); + foreach_conn(shutdown_conn); work_flush(); clean_writequeues(); -- cgit v1.2.3 From a47666eb763cc1b8b48bd88185ca56676f40ca89 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:49 -0400 Subject: fs: dlm: make connection hash lockless There are some problems with the connections_lock. During my experiements I saw sometimes circular dependencies with sock_lock. The reason here might be code parts which runs nodeid2con() before or after sock_lock is acquired. Another issue are missing locks in for_conn() iteration. Maybe this works fine because for_conn() is running in a context where connection_hash cannot be manipulated by others anymore. However this patch changes the connection_hash to be protected by sleepable rcu. The hotpath function __find_con() is implemented lockless as it is only a reader of connection_hash and this hopefully fixes the circular locking dependencies. The iteration for_conn() will still call some sleepable functionality, that's why we use sleepable rcu in this case. This patch removes the kmemcache functionality as I think I need to make some free() functionality via call_rcu(). However allocation time isn't here an issue. The dlm_allow_con will not be protected by a lock anymore as I think it's enough to just set and flush workqueues afterwards. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/Kconfig | 1 + fs/dlm/lowcomms.c | 86 +++++++++++++++++++++++-------------------------------- 2 files changed, 37 insertions(+), 50 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/Kconfig b/fs/dlm/Kconfig index f82a4952769d..ee92634196a8 100644 --- a/fs/dlm/Kconfig +++ b/fs/dlm/Kconfig @@ -4,6 +4,7 @@ menuconfig DLM depends on INET depends on SYSFS && CONFIGFS_FS && (IPV6 || IPV6=n) select IP_SCTP + select SRCU help A general purpose distributed lock manager for kernel or userspace applications. diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index ed098870ba0d..9db7126de793 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -126,6 +126,7 @@ struct connection { struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ + struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -167,8 +168,8 @@ static struct workqueue_struct *recv_workqueue; static struct workqueue_struct *send_workqueue; static struct hlist_head connection_hash[CONN_HASH_SIZE]; -static DEFINE_MUTEX(connections_lock); -static struct kmem_cache *con_cache; +static DEFINE_SPINLOCK(connections_lock); +DEFINE_STATIC_SRCU(connections_srcu); static void process_recv_sockets(struct work_struct *work); static void process_send_sockets(struct work_struct *work); @@ -184,15 +185,20 @@ static inline int nodeid_hash(int nodeid) static struct connection *__find_con(int nodeid) { - int r; + int r, idx; struct connection *con; r = nodeid_hash(nodeid); - hlist_for_each_entry(con, &connection_hash[r], list) { - if (con->nodeid == nodeid) + idx = srcu_read_lock(&connections_srcu); + hlist_for_each_entry_rcu(con, &connection_hash[r], list) { + if (con->nodeid == nodeid) { + srcu_read_unlock(&connections_srcu, idx); return con; + } } + srcu_read_unlock(&connections_srcu, idx); + return NULL; } @@ -200,7 +206,7 @@ static struct connection *__find_con(int nodeid) * If 'allocation' is zero then we don't attempt to create a new * connection structure for this node. */ -static struct connection *__nodeid2con(int nodeid, gfp_t alloc) +static struct connection *nodeid2con(int nodeid, gfp_t alloc) { struct connection *con = NULL; int r; @@ -209,13 +215,10 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) if (con || !alloc) return con; - con = kmem_cache_zalloc(con_cache, alloc); + con = kzalloc(sizeof(*con), alloc); if (!con) return NULL; - r = nodeid_hash(nodeid); - hlist_add_head(&con->list, &connection_hash[r]); - con->nodeid = nodeid; mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); @@ -233,31 +236,27 @@ static struct connection *__nodeid2con(int nodeid, gfp_t alloc) con->rx_action = zerocon->rx_action; } + r = nodeid_hash(nodeid); + + spin_lock(&connections_lock); + hlist_add_head_rcu(&con->list, &connection_hash[r]); + spin_unlock(&connections_lock); + return con; } /* Loop round all connections */ static void foreach_conn(void (*conn_func)(struct connection *c)) { - int i; - struct hlist_node *n; + int i, idx; struct connection *con; + idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE; i++) { - hlist_for_each_entry_safe(con, n, &connection_hash[i], list) + hlist_for_each_entry_rcu(con, &connection_hash[i], list) conn_func(con); } -} - -static struct connection *nodeid2con(int nodeid, gfp_t allocation) -{ - struct connection *con; - - mutex_lock(&connections_lock); - con = __nodeid2con(nodeid, allocation); - mutex_unlock(&connections_lock); - - return con; + srcu_read_unlock(&connections_srcu, idx); } static struct dlm_node_addr *find_node_addr(int nodeid) @@ -792,12 +791,9 @@ static int accept_from_sock(struct connection *con) struct connection *newcon; struct connection *addcon; - mutex_lock(&connections_lock); if (!dlm_allow_conn) { - mutex_unlock(&connections_lock); return -1; } - mutex_unlock(&connections_lock); mutex_lock_nested(&con->sock_mutex, 0); @@ -847,7 +843,7 @@ static int accept_from_sock(struct connection *con) struct connection *othercon = newcon->othercon; if (!othercon) { - othercon = kmem_cache_zalloc(con_cache, GFP_NOFS); + othercon = kzalloc(sizeof(*othercon), GFP_NOFS); if (!othercon) { log_print("failed to allocate incoming socket"); mutex_unlock(&newcon->sock_mutex); @@ -1612,16 +1608,17 @@ static void free_conn(struct connection *con) { close_connection(con, true, true, true); if (con->othercon) - kmem_cache_free(con_cache, con->othercon); - hlist_del(&con->list); - kmem_cache_free(con_cache, con); + kfree_rcu(con->othercon, rcu); + spin_lock(&connections_lock); + hlist_del_rcu(&con->list); + spin_unlock(&connections_lock); + kfree_rcu(con, rcu); } static void work_flush(void) { - int ok; + int ok, idx; int i; - struct hlist_node *n; struct connection *con; do { @@ -1631,9 +1628,10 @@ static void work_flush(void) flush_workqueue(recv_workqueue); if (send_workqueue) flush_workqueue(send_workqueue); + idx = srcu_read_lock(&connections_srcu); for (i = 0; i < CONN_HASH_SIZE && ok; i++) { - hlist_for_each_entry_safe(con, n, - &connection_hash[i], list) { + hlist_for_each_entry_rcu(con, &connection_hash[i], + list) { ok &= test_bit(CF_READ_PENDING, &con->flags); ok &= test_bit(CF_WRITE_PENDING, &con->flags); if (con->othercon) { @@ -1644,6 +1642,7 @@ static void work_flush(void) } } } + srcu_read_unlock(&connections_srcu, idx); } while (!ok); } @@ -1652,9 +1651,7 @@ void dlm_lowcomms_stop(void) /* Set all the flags to prevent any socket activity. */ - mutex_lock(&connections_lock); dlm_allow_conn = 0; - mutex_unlock(&connections_lock); if (recv_workqueue) flush_workqueue(recv_workqueue); @@ -1666,8 +1663,6 @@ void dlm_lowcomms_stop(void) clean_writequeues(); foreach_conn(free_conn); work_stop(); - - kmem_cache_destroy(con_cache); } int dlm_lowcomms_start(void) @@ -1686,16 +1681,9 @@ int dlm_lowcomms_start(void) goto fail; } - error = -ENOMEM; - con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection), - __alignof__(struct connection), 0, - NULL); - if (!con_cache) - goto fail; - error = work_start(); if (error) - goto fail_destroy; + goto fail; dlm_allow_conn = 1; @@ -1714,10 +1702,8 @@ fail_unlisten: con = nodeid2con(0,0); if (con) { close_connection(con, false, true, true); - kmem_cache_free(con_cache, con); + kfree_rcu(con, rcu); } -fail_destroy: - kmem_cache_destroy(con_cache); fail: return error; } -- cgit v1.2.3 From 043697f030c5c7889682c82f08e05adeb613939a Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:50 -0400 Subject: fs: dlm: fix dlm_local_addr memory leak This patch fixes the following memory detected by kmemleak and umount gfs2 filesystem which removed the last lockspace: unreferenced object 0xffff9264f4f48f00 (size 128): comm "mount", pid 425, jiffies 4294690253 (age 48.159s) hex dump (first 32 bytes): 02 00 52 48 c0 a8 7a fb 00 00 00 00 00 00 00 00 ..RH..z......... 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ backtrace: [<0000000067a34940>] kmemdup+0x18/0x40 [<00000000c935f9ab>] init_local+0x4c/0xa0 [<00000000bbd286ef>] dlm_lowcomms_start+0x28/0x160 [<00000000a86625cb>] dlm_new_lockspace+0x7e/0xb80 [<000000008df6cd63>] gdlm_mount+0x1cc/0x5de [<00000000b67df8c7>] gfs2_lm_mount.constprop.0+0x1a3/0x1d3 [<000000006642ac5e>] gfs2_fill_super+0x717/0xba9 [<00000000d3ab7118>] get_tree_bdev+0x17f/0x280 [<000000001975926e>] gfs2_get_tree+0x21/0x90 [<00000000561ce1c4>] vfs_get_tree+0x28/0xc0 [<000000007fecaf63>] path_mount+0x434/0xc00 [<00000000636b9594>] __x64_sys_mount+0xe3/0x120 [<00000000cc478a33>] do_syscall_64+0x33/0x40 [<00000000ce9ccf01>] entry_SYSCALL_64_after_hwframe+0x44/0xa9 Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 9db7126de793..d0ece252a0d9 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1234,6 +1234,14 @@ static void init_local(void) } } +static void deinit_local(void) +{ + int i; + + for (i = 0; i < dlm_local_count; i++) + kfree(dlm_local_addr[i]); +} + /* Initialise SCTP socket and bind to all interfaces */ static int sctp_listen_for_all(void) { @@ -1663,6 +1671,7 @@ void dlm_lowcomms_stop(void) clean_writequeues(); foreach_conn(free_conn); work_stop(); + deinit_local(); } int dlm_lowcomms_start(void) -- cgit v1.2.3 From 3d2825c8c6105b0f36f3ff72760799fa2e71420e Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:51 -0400 Subject: fs: dlm: fix configfs memory leak This patch fixes the following memory detected by kmemleak and umount gfs2 filesystem which removed the last lockspace: unreferenced object 0xffff9264f482f600 (size 192): comm "dlm_controld", pid 325, jiffies 4294690276 (age 48.136s) hex dump (first 32 bytes): 00 00 00 00 00 00 00 00 6e 6f 64 65 73 00 00 00 ........nodes... 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ................ backtrace: [<00000000060481d7>] make_space+0x41/0x130 [<000000008d905d46>] configfs_mkdir+0x1a2/0x5f0 [<00000000729502cf>] vfs_mkdir+0x155/0x210 [<000000000369bcf1>] do_mkdirat+0x6d/0x110 [<00000000cc478a33>] do_syscall_64+0x33/0x40 [<00000000ce9ccf01>] entry_SYSCALL_64_after_hwframe+0x44/0xa9 The patch just remembers the "nodes" entry pointer in space as I think it's created as subdirectory when parent "spaces" is created. In function drop_space() we will lost the pointer reference to nds because configfs_remove_default_groups(). However as this subdirectory is always available when "spaces" exists it will just be freed when "spaces" will be freed. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'fs/dlm') diff --git a/fs/dlm/config.c b/fs/dlm/config.c index 47f0b98b707f..f33a7e4ae917 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -221,6 +221,7 @@ struct dlm_space { struct list_head members; struct mutex members_lock; int members_count; + struct dlm_nodes *nds; }; struct dlm_comms { @@ -430,6 +431,7 @@ static struct config_group *make_space(struct config_group *g, const char *name) INIT_LIST_HEAD(&sp->members); mutex_init(&sp->members_lock); sp->members_count = 0; + sp->nds = nds; return &sp->group; fail: @@ -451,6 +453,7 @@ static void drop_space(struct config_group *g, struct config_item *i) static void release_space(struct config_item *i) { struct dlm_space *sp = config_item_to_space(i); + kfree(sp->nds); kfree(sp); } -- cgit v1.2.3 From 0de984323ac56aa420e6f28d7ce205a293fdb649 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:52 -0400 Subject: fs: dlm: move free writequeue into con free This patch just move the free of struct connection member writequeue into the functionality when struct connection will be freed instead of doing two iterations. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index d0ece252a0d9..04afc7178afb 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1550,13 +1550,6 @@ static void process_send_sockets(struct work_struct *work) send_to_sock(con); } - -/* Discard all entries on the write queues */ -static void clean_writequeues(void) -{ - foreach_conn(clean_one_writequeue); -} - static void work_stop(void) { if (recv_workqueue) @@ -1620,6 +1613,7 @@ static void free_conn(struct connection *con) spin_lock(&connections_lock); hlist_del_rcu(&con->list); spin_unlock(&connections_lock); + clean_one_writequeue(con); kfree_rcu(con, rcu); } @@ -1668,7 +1662,6 @@ void dlm_lowcomms_stop(void) foreach_conn(shutdown_conn); work_flush(); - clean_writequeues(); foreach_conn(free_conn); work_stop(); deinit_local(); -- cgit v1.2.3 From 948c47e9bcb6a42229cb1da1cc350c887a33ebb8 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:53 -0400 Subject: fs: dlm: handle possible othercon writequeues This patch adds free of possible other writequeue entries in othercon member of struct connection. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 04afc7178afb..794216eb728c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1608,11 +1608,13 @@ static void shutdown_conn(struct connection *con) static void free_conn(struct connection *con) { close_connection(con, true, true, true); - if (con->othercon) - kfree_rcu(con->othercon, rcu); spin_lock(&connections_lock); hlist_del_rcu(&con->list); spin_unlock(&connections_lock); + if (con->othercon) { + clean_one_writequeue(con->othercon); + kfree_rcu(con->othercon, rcu); + } clean_one_writequeue(con); kfree_rcu(con, rcu); } -- cgit v1.2.3 From 7ae0451e2e6c29ff9fc17754b1129d9491177634 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 27 Aug 2020 15:02:54 -0400 Subject: fs: dlm: use free_con to free connection This patch use free_con() functionality to free the listen connection if listen fails. It also fixes an issue that a freed resource is still part of the connection_hash as hlist_del() is not called in this case. The only difference is that free_con() handles othercon as well, but this is never been set for the listen connection. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 794216eb728c..1bf1808bfa6b 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -1704,10 +1704,8 @@ int dlm_lowcomms_start(void) fail_unlisten: dlm_allow_conn = 0; con = nodeid2con(0,0); - if (con) { - close_connection(con, false, true, true); - kfree_rcu(con, rcu); - } + if (con) + free_conn(con); fail: return error; } -- cgit v1.2.3 From 0461e0db941f8f49dcfd0576c4449f2e5beda2f6 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Sep 2020 10:31:22 -0400 Subject: fs: dlm: remove lock dependency warning During my experiments to make dlm robust against tcpkill application I was able to run sometimes in a circular lock dependency warning between clusters_root.subsys.su_mutex and con->sock_mutex. We don't need to held the sock_mutex when getting the mark value which held the clusters_root.subsys.su_mutex. This patch moves the specific handling just before the sock_mutex will be held. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 1bf1808bfa6b..24f5e55313d8 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -971,6 +971,10 @@ static void sctp_connect_to_sock(struct connection *con) return; } + result = dlm_comm_mark(con->nodeid, &mark); + if (result < 0) + return; + mutex_lock(&con->sock_mutex); /* Some odd races can cause double-connects, ignore them */ @@ -995,11 +999,6 @@ static void sctp_connect_to_sock(struct connection *con) if (result < 0) goto socket_err; - /* set skb mark */ - result = dlm_comm_mark(con->nodeid, &mark); - if (result < 0) - goto bind_err; - sock_set_mark(sock->sk, mark); con->rx_action = receive_from_sock; @@ -1072,6 +1071,10 @@ static void tcp_connect_to_sock(struct connection *con) return; } + result = dlm_comm_mark(con->nodeid, &mark); + if (result < 0) + return; + mutex_lock(&con->sock_mutex); if (con->retries++ > MAX_CONNECT_RETRIES) goto out; @@ -1086,11 +1089,6 @@ static void tcp_connect_to_sock(struct connection *con) if (result < 0) goto out_err; - /* set skb mark */ - result = dlm_comm_mark(con->nodeid, &mark); - if (result < 0) - goto out_err; - sock_set_mark(sock->sk, mark); memset(&saddr, 0, sizeof(saddr)); -- cgit v1.2.3 From 3f78cd7d2449a07904b3a23751758cbdeaaa20f3 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Sep 2020 10:31:23 -0400 Subject: fs: dlm: fix mark per nodeid setting This patch fixes to set per nodeid mark configuration for accepted sockets as well. Before this patch only the listen socket mark value was used for all accepted connections. This patch will ensure that the cluster mark attribute value will be always used for all sockets, if a per nodeid mark value is specified dlm will use this value for the specific node. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 16 ++++++++++------ fs/dlm/config.h | 2 +- fs/dlm/lowcomms.c | 12 ++++++------ 3 files changed, 17 insertions(+), 13 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/config.c b/fs/dlm/config.c index f33a7e4ae917..ca4a9795afbe 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -860,18 +860,22 @@ int dlm_comm_seq(int nodeid, uint32_t *seq) return 0; } -int dlm_comm_mark(int nodeid, unsigned int *mark) +void dlm_comm_mark(int nodeid, unsigned int *mark) { struct dlm_comm *cm; cm = get_comm(nodeid); - if (!cm) - return -ENOENT; + if (!cm) { + *mark = dlm_config.ci_mark; + return; + } - *mark = cm->mark; - put_comm(cm); + if (cm->mark) + *mark = cm->mark; + else + *mark = dlm_config.ci_mark; - return 0; + put_comm(cm); } int dlm_our_nodeid(void) diff --git a/fs/dlm/config.h b/fs/dlm/config.h index f62996cad561..3b284ae9aeeb 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -46,7 +46,7 @@ void dlm_config_exit(void); int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out, int *count_out); int dlm_comm_seq(int nodeid, uint32_t *seq); -int dlm_comm_mark(int nodeid, unsigned int *mark); +void dlm_comm_mark(int nodeid, unsigned int *mark); int dlm_our_nodeid(void); int dlm_our_addr(struct sockaddr_storage *addr, int num); diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 24f5e55313d8..96f84541867c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -790,6 +790,7 @@ static int accept_from_sock(struct connection *con) int nodeid; struct connection *newcon; struct connection *addcon; + unsigned int mark; if (!dlm_allow_conn) { return -1; @@ -826,6 +827,9 @@ static int accept_from_sock(struct connection *con) return -1; } + dlm_comm_mark(nodeid, &mark); + sock_set_mark(newsock->sk, mark); + log_print("got connection from %d", nodeid); /* Check to see if we already have a connection to this node. This @@ -971,9 +975,7 @@ static void sctp_connect_to_sock(struct connection *con) return; } - result = dlm_comm_mark(con->nodeid, &mark); - if (result < 0) - return; + dlm_comm_mark(con->nodeid, &mark); mutex_lock(&con->sock_mutex); @@ -1071,9 +1073,7 @@ static void tcp_connect_to_sock(struct connection *con) return; } - result = dlm_comm_mark(con->nodeid, &mark); - if (result < 0) - return; + dlm_comm_mark(con->nodeid, &mark); mutex_lock(&con->sock_mutex); if (con->retries++ > MAX_CONNECT_RETRIES) -- cgit v1.2.3 From e1a0ec30a571f176e9b324daba4c0e3f200fe882 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Sep 2020 10:31:24 -0400 Subject: fs: dlm: handle range check as callback This patch adds a callback to CLUSTER_ATTR macro to allow individual callbacks for attributes which might have a more complex attribute range checking just than non zero. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/config.c b/fs/dlm/config.c index ca4a9795afbe..e03b409a4df0 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -125,7 +125,7 @@ static ssize_t cluster_cluster_name_store(struct config_item *item, CONFIGFS_ATTR(cluster_, cluster_name); static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, - int *info_field, int check_zero, + int *info_field, bool (*check_cb)(unsigned int x), const char *buf, size_t len) { unsigned int x; @@ -137,7 +137,7 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, if (rc) return rc; - if (check_zero && !x) + if (check_cb && check_cb(x)) return -EINVAL; *cl_field = x; @@ -146,13 +146,13 @@ static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field, return len; } -#define CLUSTER_ATTR(name, check_zero) \ +#define CLUSTER_ATTR(name, check_cb) \ static ssize_t cluster_##name##_store(struct config_item *item, \ const char *buf, size_t len) \ { \ struct dlm_cluster *cl = config_item_to_cluster(item); \ return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name, \ - check_zero, buf, len); \ + check_cb, buf, len); \ } \ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ { \ @@ -161,20 +161,25 @@ static ssize_t cluster_##name##_show(struct config_item *item, char *buf) \ } \ CONFIGFS_ATTR(cluster_, name); -CLUSTER_ATTR(tcp_port, 1); -CLUSTER_ATTR(buffer_size, 1); -CLUSTER_ATTR(rsbtbl_size, 1); -CLUSTER_ATTR(recover_timer, 1); -CLUSTER_ATTR(toss_secs, 1); -CLUSTER_ATTR(scan_secs, 1); -CLUSTER_ATTR(log_debug, 0); -CLUSTER_ATTR(log_info, 0); -CLUSTER_ATTR(protocol, 0); -CLUSTER_ATTR(mark, 0); -CLUSTER_ATTR(timewarn_cs, 1); -CLUSTER_ATTR(waitwarn_us, 0); -CLUSTER_ATTR(new_rsb_count, 0); -CLUSTER_ATTR(recover_callbacks, 0); +static bool dlm_check_zero(unsigned int x) +{ + return !x; +} + +CLUSTER_ATTR(tcp_port, dlm_check_zero); +CLUSTER_ATTR(buffer_size, dlm_check_zero); +CLUSTER_ATTR(rsbtbl_size, dlm_check_zero); +CLUSTER_ATTR(recover_timer, dlm_check_zero); +CLUSTER_ATTR(toss_secs, dlm_check_zero); +CLUSTER_ATTR(scan_secs, dlm_check_zero); +CLUSTER_ATTR(log_debug, NULL); +CLUSTER_ATTR(log_info, NULL); +CLUSTER_ATTR(protocol, NULL); +CLUSTER_ATTR(mark, NULL); +CLUSTER_ATTR(timewarn_cs, dlm_check_zero); +CLUSTER_ATTR(waitwarn_us, NULL); +CLUSTER_ATTR(new_rsb_count, NULL); +CLUSTER_ATTR(recover_callbacks, NULL); static struct configfs_attribute *cluster_attrs[] = { [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port, -- cgit v1.2.3 From 4e192ee68e5af301470a925b76700d788db35d96 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Sep 2020 10:31:25 -0400 Subject: fs: dlm: disallow buffer size below default I observed that the upper layer will not send messages above this value. As conclusion the application receive buffer should not below that value, otherwise we are not capable to deliver the dlm message to the upper layer. This patch forbids to set the receive buffer below the maximum possible dlm message size. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/config.c b/fs/dlm/config.c index e03b409a4df0..a4bed304a843 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -166,8 +166,14 @@ static bool dlm_check_zero(unsigned int x) return !x; } +#define DEFAULT_BUFFER_SIZE 4096 +static bool dlm_check_buffer_size(unsigned int x) +{ + return (x < DEFAULT_BUFFER_SIZE); +} + CLUSTER_ATTR(tcp_port, dlm_check_zero); -CLUSTER_ATTR(buffer_size, dlm_check_zero); +CLUSTER_ATTR(buffer_size, dlm_check_buffer_size); CLUSTER_ATTR(rsbtbl_size, dlm_check_zero); CLUSTER_ATTR(recover_timer, dlm_check_zero); CLUSTER_ATTR(toss_secs, dlm_check_zero); @@ -901,7 +907,6 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num) /* Config file defaults */ #define DEFAULT_TCP_PORT 21064 -#define DEFAULT_BUFFER_SIZE 4096 #define DEFAULT_RSBTBL_SIZE 1024 #define DEFAULT_RECOVER_TIMER 5 #define DEFAULT_TOSS_SECS 10 -- cgit v1.2.3 From 4798cbbfbd00c498339bdcf4cc2429f53eb374ec Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Thu, 24 Sep 2020 10:31:26 -0400 Subject: fs: dlm: rework receive handling This patch reworks the current receive handling of dlm. As I tried to change the send handling to fix reorder issues I took a look into the receive handling and simplified it, it works as the following: Each connection has a preallocated receive buffer with a minimum length of 4096. On receive, the upper layer protocol will process all dlm message until there is not enough data anymore. If there exists "leftover" data at the end of the receive buffer because the dlm message wasn't fully received it will be copied to the begin of the preallocated receive buffer. Next receive more data will be appended to the previous "leftover" data and processing will begin again. This will remove a lot of code of the current mechanism. Inside the processing functionality we will ensure with a memmove() that the dlm message should be memory aligned. To have a dlm message always started at the beginning of the buffer will reduce some amount of memmove() calls because src and dest pointers are the same. The cluster attribute "buffer_size" becomes a new meaning, it's now the size of application layer receive buffer size. If this is changed during runtime the receive buffer will be reallocated. It's important that the receive buffer size has at minimum the size of the maximum possible dlm message size otherwise the received message cannot be placed inside the receive buffer size. Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/config.c | 1 - fs/dlm/config.h | 2 + fs/dlm/lowcomms.c | 179 ++++++++++++++++++++++++++---------------------------- fs/dlm/midcomms.c | 136 ++++++++++++++++------------------------- fs/dlm/midcomms.h | 3 +- 5 files changed, 141 insertions(+), 180 deletions(-) (limited to 'fs/dlm') diff --git a/fs/dlm/config.c b/fs/dlm/config.c index a4bed304a843..49c5f9407098 100644 --- a/fs/dlm/config.c +++ b/fs/dlm/config.c @@ -166,7 +166,6 @@ static bool dlm_check_zero(unsigned int x) return !x; } -#define DEFAULT_BUFFER_SIZE 4096 static bool dlm_check_buffer_size(unsigned int x) { return (x < DEFAULT_BUFFER_SIZE); diff --git a/fs/dlm/config.h b/fs/dlm/config.h index 3b284ae9aeeb..c210250a2581 100644 --- a/fs/dlm/config.h +++ b/fs/dlm/config.h @@ -12,6 +12,8 @@ #ifndef __CONFIG_DOT_H__ #define __CONFIG_DOT_H__ +#define DEFAULT_BUFFER_SIZE 4096 + struct dlm_config_node { int nodeid; int weight; diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 96f84541867c..b7b7360be609 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -65,40 +65,6 @@ #define MAX_SEND_MSG_COUNT 25 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -static bool cbuf_empty(struct cbuf *cb) -{ - return cb->len == 0; -} - struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ @@ -117,8 +83,6 @@ struct connection { int (*rx_action) (struct connection *); /* What to do when active */ void (*connect_action) (struct connection *); /* What to do to connect */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ - struct page *rx_page; - struct cbuf cb; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -126,6 +90,9 @@ struct connection { struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ + unsigned char *rx_buf; + int rx_buflen; + int rx_leftover; struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -219,6 +186,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) if (!con) return NULL; + con->rx_buflen = dlm_config.ci_buffer_size; + con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); + if (!con->rx_buf) { + kfree(con); + return NULL; + } + con->nodeid = nodeid; mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); @@ -613,11 +587,8 @@ static void close_connection(struct connection *con, bool and_other, /* Will only re-enter once. */ close_connection(con->othercon, false, true, true); } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; - } + con->rx_leftover = 0; con->retries = 0; mutex_unlock(&con->sock_mutex); clear_bit(CF_CLOSING, &con->flags); @@ -671,16 +642,33 @@ static void dlm_tcp_shutdown(struct connection *con) shutdown_connection(con); } +static int con_realloc_receive_buf(struct connection *con, int newlen) +{ + unsigned char *newbuf; + + newbuf = kmalloc(newlen, GFP_NOFS); + if (!newbuf) + return -ENOMEM; + + /* copy any leftover from last receive */ + if (con->rx_leftover) + memmove(newbuf, con->rx_buf, con->rx_leftover); + + /* swap to new buffer space */ + kfree(con->rx_buf); + con->rx_buflen = newlen; + con->rx_buf = newbuf; + + return 0; +} + /* Data received from remote end */ static int receive_from_sock(struct connection *con) { - int ret = 0; - struct msghdr msg = {}; - struct kvec iov[2]; - unsigned len; - int r; int call_again_soon = 0; - int nvec; + struct msghdr msg; + struct kvec iov; + int ret, buflen; mutex_lock(&con->sock_mutex); @@ -688,71 +676,55 @@ static int receive_from_sock(struct connection *con) ret = -EAGAIN; goto out_close; } + if (con->nodeid == 0) { ret = -EINVAL; goto out_close; } - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) + /* realloc if we get new buffer size to read out */ + buflen = dlm_config.ci_buffer_size; + if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { + ret = con_realloc_receive_buf(con, buflen); + if (ret < 0) goto out_resched; - cbuf_init(&con->cb, PAGE_SIZE); } - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes */ - iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); - iov[1].iov_len = 0; - nvec = 1; + iov.iov_base = con->rx_buf + con->rx_leftover; + iov.iov_len = con->rx_buflen - con->rx_leftover; - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - nvec = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len); - - r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); if (ret <= 0) goto out_close; - else if (ret == len) + else if (ret == iov.iov_len) call_again_soon = 1; - cbuf_add(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_SIZE); - if (ret < 0) { - log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d", - ret, page_address(con->rx_page), con->cb.base, - con->cb.len, r); - cbuf_eat(&con->cb, r); - } else { - cbuf_eat(&con->cb, ret); - } + /* new buflen according readed bytes and leftover from last receive */ + buflen = ret + con->rx_leftover; + ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); + if (ret < 0) + goto out_close; - if (cbuf_empty(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen - ret; + if (con->rx_leftover) { + memmove(con->rx_buf, con->rx_buf + ret, + con->rx_leftover); + call_again_soon = true; } if (call_again_soon) goto out_resched; + mutex_unlock(&con->sock_mutex); return 0; @@ -854,6 +826,17 @@ static int accept_from_sock(struct connection *con) result = -ENOMEM; goto accept_err; } + + othercon->rx_buflen = dlm_config.ci_buffer_size; + othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); + if (!othercon->rx_buf) { + mutex_unlock(&newcon->sock_mutex); + kfree(othercon); + log_print("failed to allocate incoming socket receive buffer"); + result = -ENOMEM; + goto accept_err; + } + othercon->nodeid = nodeid; othercon->rx_action = receive_from_sock; mutex_init(&othercon->sock_mutex); @@ -1603,6 +1586,14 @@ static void shutdown_conn(struct connection *con) con->shutdown_action(con); } +static void connection_release(struct rcu_head *rcu) +{ + struct connection *con = container_of(rcu, struct connection, rcu); + + kfree(con->rx_buf); + kfree(con); +} + static void free_conn(struct connection *con) { close_connection(con, true, true, true); @@ -1611,10 +1602,10 @@ static void free_conn(struct connection *con) spin_unlock(&connections_lock); if (con->othercon) { clean_one_writequeue(con->othercon); - kfree_rcu(con->othercon, rcu); + call_rcu(&con->othercon->rcu, connection_release); } clean_one_writequeue(con); - kfree_rcu(con, rcu); + call_rcu(&con->rcu, connection_release); } static void work_flush(void) diff --git a/fs/dlm/midcomms.c b/fs/dlm/midcomms.c index 921322d133e3..fde3a6afe4be 100644 --- a/fs/dlm/midcomms.c +++ b/fs/dlm/midcomms.c @@ -22,114 +22,84 @@ * into packets and sends them to the comms layer. */ +#include + #include "dlm_internal.h" #include "lowcomms.h" #include "config.h" #include "lock.h" #include "midcomms.h" - -static void copy_from_cb(void *dst, const void *base, unsigned offset, - unsigned len, unsigned limit) -{ - unsigned copy = len; - - if ((copy + offset) > limit) - copy = limit - offset; - memcpy(dst, base + offset, copy); - len -= copy; - if (len) - memcpy(dst + copy, base, len); -} - /* * Called from the low-level comms layer to process a buffer of * commands. - * - * Only complete messages are processed here, any "spare" bytes from - * the end of a buffer are saved and tacked onto the front of the next - * message that comes in. I doubt this will happen very often but we - * need to be able to cope with it and I don't want the task to be waiting - * for packets to come in when there is useful work to be done. */ -int dlm_process_incoming_buffer(int nodeid, const void *base, - unsigned offset, unsigned len, unsigned limit) +int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len) { - union { - unsigned char __buf[DLM_INBUF_LEN]; - /* this is to force proper alignment on some arches */ - union dlm_packet p; - } __tmp; - union dlm_packet *p = &__tmp.p; - int ret = 0; - int err = 0; + const unsigned char *ptr = buf; + const struct dlm_header *hd; uint16_t msglen; - uint32_t lockspace; - - while (len > sizeof(struct dlm_header)) { - - /* Copy just the header to check the total length. The - message may wrap around the end of the buffer back to the - start, so we need to use a temp buffer and copy_from_cb. */ - - copy_from_cb(p, base, offset, sizeof(struct dlm_header), - limit); - - msglen = le16_to_cpu(p->header.h_length); - lockspace = p->header.h_lockspace; + int ret = 0; - err = -EINVAL; - if (msglen < sizeof(struct dlm_header)) - break; - if (p->header.h_cmd == DLM_MSG) { - if (msglen < sizeof(struct dlm_message)) - break; - } else { - if (msglen < sizeof(struct dlm_rcom)) - break; - } - err = -E2BIG; - if (msglen > dlm_config.ci_buffer_size) { - log_print("message size %d from %d too big, buf len %d", - msglen, nodeid, len); - break; + while (len >= sizeof(struct dlm_header)) { + hd = (struct dlm_header *)ptr; + + /* no message should be more than this otherwise we + * cannot deliver this message to upper layers + */ + msglen = get_unaligned_le16(&hd->h_length); + if (msglen > DEFAULT_BUFFER_SIZE) { + log_print("received invalid length header: %u, will abort message parsing", + msglen); + return -EBADMSG; } - err = 0; - - /* If only part of the full message is contained in this - buffer, then do nothing and wait for lowcomms to call - us again later with more data. We return 0 meaning - we've consumed none of the input buffer. */ + /* caller will take care that leftover + * will be parsed next call with more data + */ if (msglen > len) break; - /* Allocate a larger temp buffer if the full message won't fit - in the buffer on the stack (which should work for most - ordinary messages). */ - - if (msglen > sizeof(__tmp) && p == &__tmp.p) { - p = kmalloc(dlm_config.ci_buffer_size, GFP_NOFS); - if (p == NULL) - return ret; - } + switch (hd->h_cmd) { + case DLM_MSG: + if (msglen < sizeof(struct dlm_message)) { + log_print("dlm msg too small: %u, will skip this message", + msglen); + goto skip; + } - copy_from_cb(p, base, offset, msglen, limit); + break; + case DLM_RCOM: + if (msglen < sizeof(struct dlm_rcom)) { + log_print("dlm rcom msg too small: %u, will skip this message", + msglen); + goto skip; + } - BUG_ON(lockspace != p->header.h_lockspace); + break; + default: + log_print("unsupported h_cmd received: %u, will skip this message", + hd->h_cmd); + goto skip; + } + /* for aligned memory access, we just copy current message + * to begin of the buffer which contains already parsed buffer + * data and should provide align access for upper layers + * because the start address of the buffer has a aligned + * address. This memmove can be removed when the upperlayer + * is capable of unaligned memory access. + */ + memmove(buf, ptr, msglen); + dlm_receive_buffer((union dlm_packet *)buf, nodeid); + +skip: ret += msglen; - offset += msglen; - offset &= (limit - 1); len -= msglen; - - dlm_receive_buffer(p, nodeid); + ptr += msglen; } - if (p != &__tmp.p) - kfree(p); - - return err ? err : ret; + return ret; } diff --git a/fs/dlm/midcomms.h b/fs/dlm/midcomms.h index 2e122e81c8d0..61e90a921849 100644 --- a/fs/dlm/midcomms.h +++ b/fs/dlm/midcomms.h @@ -12,8 +12,7 @@ #ifndef __MIDCOMMS_DOT_H__ #define __MIDCOMMS_DOT_H__ -int dlm_process_incoming_buffer(int nodeid, const void *base, unsigned offset, - unsigned len, unsigned limit); +int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int buflen); #endif /* __MIDCOMMS_DOT_H__ */ -- cgit v1.2.3 From 4f2b30fd9b4bd6e3620fe55786df7fc5f89ad526 Mon Sep 17 00:00:00 2001 From: Alexander Aring Date: Wed, 30 Sep 2020 18:37:29 -0400 Subject: fs: dlm: fix race in nodeid2con This patch fixes a race in nodeid2con in cases that we parallel running a lookup and both will create a connection structure for the same nodeid. It's a rare case to create a new connection structure to keep reader lockless we just do a lookup inside the protection area again and drop previous work if this race happens. Fixes: a47666eb763cc ("fs: dlm: make connection hash lockless") Signed-off-by: Alexander Aring Signed-off-by: David Teigland --- fs/dlm/lowcomms.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) (limited to 'fs/dlm') diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index b7b7360be609..79f56f16bc2c 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -175,7 +175,7 @@ static struct connection *__find_con(int nodeid) */ static struct connection *nodeid2con(int nodeid, gfp_t alloc) { - struct connection *con = NULL; + struct connection *con, *tmp; int r; con = __find_con(nodeid); @@ -213,6 +213,20 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) r = nodeid_hash(nodeid); spin_lock(&connections_lock); + /* Because multiple workqueues/threads calls this function it can + * race on multiple cpu's. Instead of locking hot path __find_con() + * we just check in rare cases of recently added nodes again + * under protection of connections_lock. If this is the case we + * abort our connection creation and return the existing connection. + */ + tmp = __find_con(nodeid); + if (tmp) { + spin_unlock(&connections_lock); + kfree(con->rx_buf); + kfree(con); + return tmp; + } + hlist_add_head_rcu(&con->list, &connection_hash[r]); spin_unlock(&connections_lock); -- cgit v1.2.3