diff options
Diffstat (limited to 'net/rds/connection.c')
-rw-r--r-- | net/rds/connection.c | 159 |
1 files changed, 115 insertions, 44 deletions
diff --git a/net/rds/connection.c b/net/rds/connection.c index 7619b671ca28..9334d892366e 100644 --- a/net/rds/connection.c +++ b/net/rds/connection.c @@ -37,7 +37,6 @@ #include "rds.h" #include "loop.h" -#include "rdma.h" #define RDS_CONNECTION_HASH_BITS 12 #define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS) @@ -63,18 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr) var |= RDS_INFO_CONNECTION_FLAG_##suffix; \ } while (0) -static inline int rds_conn_is_sending(struct rds_connection *conn) -{ - int ret = 0; - - if (!mutex_trylock(&conn->c_send_lock)) - ret = 1; - else - mutex_unlock(&conn->c_send_lock); - - return ret; -} - +/* rcu read lock must be held or the connection spinlock */ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, __be32 laddr, __be32 faddr, struct rds_transport *trans) @@ -82,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, struct rds_connection *conn, *ret = NULL; struct hlist_node *pos; - hlist_for_each_entry(conn, pos, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { if (conn->c_faddr == faddr && conn->c_laddr == laddr && conn->c_trans == trans) { ret = conn; @@ -100,7 +88,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head, * and receiving over this connection again in the future. It is up to * the transport to have serialized this call with its send and recv. */ -void rds_conn_reset(struct rds_connection *conn) +static void rds_conn_reset(struct rds_connection *conn) { rdsdebug("connection %pI4 to %pI4 reset\n", &conn->c_laddr, &conn->c_faddr); @@ -129,10 +117,11 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, { struct rds_connection *conn, *parent = NULL; struct hlist_head *head = rds_conn_bucket(laddr, faddr); + struct rds_transport *loop_trans; unsigned long flags; int ret; - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); conn = rds_conn_lookup(head, laddr, faddr, trans); if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport && !is_outgoing) { @@ -143,12 +132,12 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, parent = conn; conn = parent->c_passive; } - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); if (conn) goto out; conn = kmem_cache_zalloc(rds_conn_slab, gfp); - if (conn == NULL) { + if (!conn) { conn = ERR_PTR(-ENOMEM); goto out; } @@ -159,7 +148,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, spin_lock_init(&conn->c_lock); conn->c_next_tx_seq = 1; - mutex_init(&conn->c_send_lock); + init_waitqueue_head(&conn->c_waitq); INIT_LIST_HEAD(&conn->c_send_queue); INIT_LIST_HEAD(&conn->c_retrans); @@ -175,7 +164,9 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, * can bind to the destination address then we'd rather the messages * flow through loopback rather than either transport. */ - if (rds_trans_get_preferred(faddr)) { + loop_trans = rds_trans_get_preferred(faddr); + if (loop_trans) { + rds_trans_put(loop_trans); conn->c_loopback = 1; if (is_outgoing && trans->t_prefer_loopback) { /* "outgoing" connection - and the transport @@ -238,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr, kmem_cache_free(rds_conn_slab, conn); conn = found; } else { - hlist_add_head(&conn->c_hash_node, head); + hlist_add_head_rcu(&conn->c_hash_node, head); rds_cong_add_conn(conn); rds_conn_count++; } @@ -263,21 +254,91 @@ struct rds_connection *rds_conn_create_outgoing(__be32 laddr, __be32 faddr, } EXPORT_SYMBOL_GPL(rds_conn_create_outgoing); +void rds_conn_shutdown(struct rds_connection *conn) +{ + /* shut it down unless it's down already */ + if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_DOWN)) { + /* + * Quiesce the connection mgmt handlers before we start tearing + * things down. We don't hold the mutex for the entire + * duration of the shutdown operation, else we may be + * deadlocking with the CM handler. Instead, the CM event + * handler is supposed to check for state DISCONNECTING + */ + mutex_lock(&conn->c_cm_lock); + if (!rds_conn_transition(conn, RDS_CONN_UP, RDS_CONN_DISCONNECTING) + && !rds_conn_transition(conn, RDS_CONN_ERROR, RDS_CONN_DISCONNECTING)) { + rds_conn_error(conn, "shutdown called in state %d\n", + atomic_read(&conn->c_state)); + mutex_unlock(&conn->c_cm_lock); + return; + } + mutex_unlock(&conn->c_cm_lock); + + wait_event(conn->c_waitq, + !test_bit(RDS_IN_XMIT, &conn->c_flags)); + + conn->c_trans->conn_shutdown(conn); + rds_conn_reset(conn); + + if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) { + /* This can happen - eg when we're in the middle of tearing + * down the connection, and someone unloads the rds module. + * Quite reproduceable with loopback connections. + * Mostly harmless. + */ + rds_conn_error(conn, + "%s: failed to transition to state DOWN, " + "current state is %d\n", + __func__, + atomic_read(&conn->c_state)); + return; + } + } + + /* Then reconnect if it's still live. + * The passive side of an IB loopback connection is never added + * to the conn hash, so we never trigger a reconnect on this + * conn - the reconnect is always triggered by the active peer. */ + cancel_delayed_work_sync(&conn->c_conn_w); + rcu_read_lock(); + if (!hlist_unhashed(&conn->c_hash_node)) { + rcu_read_unlock(); + rds_queue_reconnect(conn); + } else { + rcu_read_unlock(); + } +} + +/* + * Stop and free a connection. + * + * This can only be used in very limited circumstances. It assumes that once + * the conn has been shutdown that no one else is referencing the connection. + * We can only ensure this in the rmmod path in the current code. + */ void rds_conn_destroy(struct rds_connection *conn) { struct rds_message *rm, *rtmp; + unsigned long flags; rdsdebug("freeing conn %p for %pI4 -> " "%pI4\n", conn, &conn->c_laddr, &conn->c_faddr); - hlist_del_init(&conn->c_hash_node); + /* Ensure conn will not be scheduled for reconnect */ + spin_lock_irq(&rds_conn_lock); + hlist_del_init_rcu(&conn->c_hash_node); + spin_unlock_irq(&rds_conn_lock); + synchronize_rcu(); - /* wait for the rds thread to shut it down */ - atomic_set(&conn->c_state, RDS_CONN_ERROR); - cancel_delayed_work(&conn->c_conn_w); - queue_work(rds_wq, &conn->c_down_w); - flush_workqueue(rds_wq); + /* shut the connection down */ + rds_conn_drop(conn); + flush_work(&conn->c_down_w); + + /* make sure lingering queued work won't try to ref the conn */ + cancel_delayed_work_sync(&conn->c_send_w); + cancel_delayed_work_sync(&conn->c_recv_w); /* tear down queued messages */ list_for_each_entry_safe(rm, rtmp, @@ -302,7 +363,9 @@ void rds_conn_destroy(struct rds_connection *conn) BUG_ON(!list_empty(&conn->c_retrans)); kmem_cache_free(rds_conn_slab, conn); + spin_lock_irqsave(&rds_conn_lock, flags); rds_conn_count--; + spin_unlock_irqrestore(&rds_conn_lock, flags); } EXPORT_SYMBOL_GPL(rds_conn_destroy); @@ -316,23 +379,23 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, struct list_head *list; struct rds_connection *conn; struct rds_message *rm; - unsigned long flags; unsigned int total = 0; + unsigned long flags; size_t i; len /= sizeof(struct rds_info_message); - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { - hlist_for_each_entry(conn, pos, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { if (want_send) list = &conn->c_send_queue; else list = &conn->c_retrans; - spin_lock(&conn->c_lock); + spin_lock_irqsave(&conn->c_lock, flags); /* XXX too lazy to maintain counts.. */ list_for_each_entry(rm, list, m_conn_item) { @@ -343,11 +406,10 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len, conn->c_faddr, 0); } - spin_unlock(&conn->c_lock); + spin_unlock_irqrestore(&conn->c_lock, flags); } } - - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); lens->nr = total; lens->each = sizeof(struct rds_info_message); @@ -377,19 +439,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, uint64_t buffer[(item_len + 7) / 8]; struct hlist_head *head; struct hlist_node *pos; - struct hlist_node *tmp; struct rds_connection *conn; - unsigned long flags; size_t i; - spin_lock_irqsave(&rds_conn_lock, flags); + rcu_read_lock(); lens->nr = 0; lens->each = item_len; for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash); i++, head++) { - hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) { + hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) { /* XXX no c_lock usage.. */ if (!visitor(conn, buffer)) @@ -405,8 +465,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len, lens->nr++; } } - - spin_unlock_irqrestore(&rds_conn_lock, flags); + rcu_read_unlock(); } EXPORT_SYMBOL_GPL(rds_for_each_conn_info); @@ -423,8 +482,8 @@ static int rds_conn_info_visitor(struct rds_connection *conn, sizeof(cinfo->transport)); cinfo->flags = 0; - rds_conn_info_set(cinfo->flags, - rds_conn_is_sending(conn), SENDING); + rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &conn->c_flags), + SENDING); /* XXX Future: return the state rather than these funky bits */ rds_conn_info_set(cinfo->flags, atomic_read(&conn->c_state) == RDS_CONN_CONNECTING, @@ -444,12 +503,12 @@ static void rds_conn_info(struct socket *sock, unsigned int len, sizeof(struct rds_info_connection)); } -int __init rds_conn_init(void) +int rds_conn_init(void) { rds_conn_slab = kmem_cache_create("rds_connection", sizeof(struct rds_connection), 0, 0, NULL); - if (rds_conn_slab == NULL) + if (!rds_conn_slab) return -ENOMEM; rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info); @@ -487,6 +546,18 @@ void rds_conn_drop(struct rds_connection *conn) EXPORT_SYMBOL_GPL(rds_conn_drop); /* + * If the connection is down, trigger a connect. We may have scheduled a + * delayed reconnect however - in this case we should not interfere. + */ +void rds_conn_connect_if_down(struct rds_connection *conn) +{ + if (rds_conn_state(conn) == RDS_CONN_DOWN && + !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) + queue_delayed_work(rds_wq, &conn->c_conn_w, 0); +} +EXPORT_SYMBOL_GPL(rds_conn_connect_if_down); + +/* * An error occurred on the connection */ void |