summaryrefslogtreecommitdiffstats
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c82
1 files changed, 43 insertions, 39 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index b9e41afef323..81471b25373b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -53,14 +53,14 @@ module_param(send_batch_count, int, 0444);
MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
/*
- * Reset the send state. Caller must hold c_send_lock when calling here.
+ * Reset the send state. Callers must ensure that this doesn't race with
+ * rds_send_xmit().
*/
void rds_send_reset(struct rds_connection *conn)
{
struct rds_message *rm, *tmp;
unsigned long flags;
- spin_lock_irqsave(&conn->c_send_lock, flags);
if (conn->c_xmit_rm) {
rm = conn->c_xmit_rm;
conn->c_xmit_rm = NULL;
@@ -69,11 +69,7 @@ void rds_send_reset(struct rds_connection *conn)
* independently) but as the connection is down, there's
* no ongoing RDMA to/from that memory */
rds_message_unmapped(rm);
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
-
rds_message_put(rm);
- } else {
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
}
conn->c_xmit_sg = 0;
@@ -98,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
spin_unlock_irqrestore(&conn->c_lock, flags);
}
+static int acquire_in_xmit(struct rds_connection *conn)
+{
+ return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+}
+
+static void release_in_xmit(struct rds_connection *conn)
+{
+ clear_bit(RDS_IN_XMIT, &conn->c_flags);
+ smp_mb__after_clear_bit();
+ /*
+ * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
+ * hot path and finding waiters is very rare. We don't want to walk
+ * the system-wide hashed waitqueue buckets in the fast path only to
+ * almost never find waiters.
+ */
+ if (waitqueue_active(&conn->c_waitq))
+ wake_up_all(&conn->c_waitq);
+}
+
/*
* We're making the concious trade-off here to only send one message
* down the connection at a time.
@@ -119,12 +134,9 @@ int rds_send_xmit(struct rds_connection *conn)
unsigned int tmp;
struct scatterlist *sg;
int ret = 0;
- int gen = 0;
LIST_HEAD(to_be_dropped);
restart:
- if (!rds_conn_up(conn))
- goto out;
/*
* sendmsg calls here after having queued its message on the send
@@ -133,18 +145,25 @@ restart:
* avoids blocking the caller and trading per-connection data between
* caches per message.
*/
- if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+ if (!acquire_in_xmit(conn)) {
rds_stats_inc(s_send_lock_contention);
ret = -ENOMEM;
goto out;
}
- atomic_inc(&conn->c_senders);
+
+ /*
+ * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
+ * we do the opposite to avoid races.
+ */
+ if (!rds_conn_up(conn)) {
+ release_in_xmit(conn);
+ ret = 0;
+ goto out;
+ }
if (conn->c_trans->xmit_prepare)
conn->c_trans->xmit_prepare(conn);
- gen = atomic_inc_return(&conn->c_send_generation);
-
/*
* spin trying to push headers and data down the connection until
* the connection doesn't make forward progress.
@@ -178,7 +197,7 @@ restart:
if (!rm) {
unsigned int len;
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (!list_empty(&conn->c_send_queue)) {
rm = list_entry(conn->c_send_queue.next,
@@ -193,7 +212,7 @@ restart:
list_move_tail(&rm->m_conn_item, &conn->c_retrans);
}
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
if (!rm)
break;
@@ -207,10 +226,10 @@ restart:
*/
if (rm->rdma.op_active &&
test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
- spin_lock(&conn->c_lock);
+ spin_lock_irqsave(&conn->c_lock, flags);
if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
list_move(&rm->m_conn_item, &to_be_dropped);
- spin_unlock(&conn->c_lock);
+ spin_unlock_irqrestore(&conn->c_lock, flags);
continue;
}
@@ -336,19 +355,7 @@ restart:
if (conn->c_trans->xmit_complete)
conn->c_trans->xmit_complete(conn);
- /*
- * We might be racing with another sender who queued a message but
- * backed off on noticing that we held the c_send_lock. If we check
- * for queued messages after dropping the sem then either we'll
- * see the queued message or the queuer will get the sem. If we
- * notice the queued message then we trigger an immediate retry.
- *
- * We need to be careful only to do this when we stopped processing
- * the send queue because it was empty. It's the only way we
- * stop processing the loop when the transport hasn't taken
- * responsibility for forward progress.
- */
- spin_unlock_irqrestore(&conn->c_send_lock, flags);
+ release_in_xmit(conn);
/* Nuke any messages we decided not to retransmit. */
if (!list_empty(&to_be_dropped)) {
@@ -358,13 +365,12 @@ restart:
rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
}
- atomic_dec(&conn->c_senders);
-
/*
- * Other senders will see we have c_send_lock and exit. We
- * need to recheck the send queue and race again for c_send_lock
- * to make sure messages don't just sit on the send queue, if
- * somebody hasn't already beat us into the loop.
+ * Other senders can queue a message after we last test the send queue
+ * but before we clear RDS_IN_XMIT. In that case they'd back off and
+ * not try and send their newly queued message. We need to check the
+ * send queue after having cleared RDS_IN_XMIT so that their message
+ * doesn't get stuck on the send queue.
*
* If the transport cannot continue (i.e ret != 0), then it must
* call us when more room is available, such as from the tx
@@ -374,9 +380,7 @@ restart:
smp_mb();
if (!list_empty(&conn->c_send_queue)) {
rds_stats_inc(s_send_lock_queue_raced);
- if (gen == atomic_read(&conn->c_send_generation)) {
- goto restart;
- }
+ goto restart;
}
}
out: