summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/ib.h1
-rw-r--r--net/rds/ib_cm.c14
-rw-r--r--net/rds/ib_send.c47
3 files changed, 54 insertions, 8 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index acda2dbc6576..a13ced504145 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -108,6 +108,7 @@ struct rds_ib_connection {
struct rds_header *i_send_hdrs;
u64 i_send_hdrs_dma;
struct rds_ib_send_work *i_sends;
+ atomic_t i_signaled_sends;
/* rx */
struct tasklet_struct i_recv_tasklet;
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 10f6a8815cd0..123c7d33b54e 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -615,11 +615,18 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
}
/*
- * Don't wait for the send ring to be empty -- there may be completed
- * non-signaled entries sitting on there. We unmap these below.
+ * We want to wait for tx and rx completion to finish
+ * before we tear down the connection, but we have to be
+ * careful not to get stuck waiting on a send ring that
+ * only has unsignaled sends in it. We've shutdown new
+ * sends before getting here so by waiting for signaled
+ * sends to complete we're ensured that there will be no
+ * more tx processing.
*/
wait_event(rds_ib_ring_empty_wait,
- rds_ib_ring_empty(&ic->i_recv_ring));
+ rds_ib_ring_empty(&ic->i_recv_ring) &&
+ (atomic_read(&ic->i_signaled_sends) == 0));
+ tasklet_kill(&ic->i_recv_tasklet);
if (ic->i_send_hdrs)
ib_dma_free_coherent(dev,
@@ -729,6 +736,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);
#endif
+ atomic_set(&ic->i_signaled_sends, 0);
/*
* rds_ib_conn_shutdown() waits for these to be emptied so they
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index e88cb4af009b..15f75692574c 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -220,6 +220,18 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
}
/*
+ * The only fast path caller always has a non-zero nr, so we don't
+ * bother testing nr before performing the atomic sub.
+ */
+static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
+{
+ if ((atomic_sub_return(nr, &ic->i_signaled_sends) == 0) &&
+ waitqueue_active(&rds_ib_ring_empty_wait))
+ wake_up(&rds_ib_ring_empty_wait);
+ BUG_ON(atomic_read(&ic->i_signaled_sends) < 0);
+}
+
+/*
* The _oldest/_free ring operations here race cleanly with the alloc/unalloc
* operations performed in the send path. As the sender allocs and potentially
* unallocs the next free entry in the ring it doesn't alter which is
@@ -236,6 +248,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
u32 oldest;
u32 i = 0;
int ret;
+ int nr_sig = 0;
rdsdebug("cq %p conn %p\n", cq, conn);
rds_ib_stats_inc(s_ib_tx_cq_call);
@@ -262,6 +275,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
for (i = 0; i < completed; i++) {
send = &ic->i_sends[oldest];
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
rm = rds_ib_send_unmap_op(ic, send, wc.status);
@@ -282,6 +297,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
}
rds_ib_ring_free(&ic->i_send_ring, completed);
+ rds_ib_sub_signaled(ic, nr_sig);
+ nr_sig = 0;
if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
test_bit(0, &conn->c_map_queued))
@@ -440,9 +457,9 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
}
-static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
- struct rds_ib_send_work *send,
- bool notify)
+static inline int rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+ struct rds_ib_send_work *send,
+ bool notify)
{
/*
* We want to delay signaling completions just enough to get
@@ -452,7 +469,9 @@ static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
if (ic->i_unsignaled_wrs-- == 0 || notify) {
ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
send->s_wr.send_flags |= IB_SEND_SIGNALED;
+ return 1;
}
+ return 0;
}
/*
@@ -488,6 +507,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
int bytes_sent = 0;
int ret;
int flow_controlled = 0;
+ int nr_sig = 0;
BUG_ON(off % RDS_FRAG_SIZE);
BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
@@ -645,6 +665,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && flow_controlled && i == (work_alloc-1))
send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+ if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+ nr_sig++;
+
rdsdebug("send %p wr %p num_sge %u next %p\n", send,
&send->s_wr, send->s_wr.num_sge, send->s_wr.next);
@@ -689,6 +712,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
if (ic->i_flowctl && i < credit_alloc)
rds_ib_send_add_credits(conn, credit_alloc - i);
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
/* XXX need to worry about failed_wr and partial sends. */
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
@@ -699,6 +725,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
if (prev->s_op) {
ic->i_data_op = prev->s_op;
prev->s_op = NULL;
@@ -728,6 +755,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
u32 pos;
u32 work_alloc;
int ret;
+ int nr_sig = 0;
rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
@@ -752,7 +780,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
send->s_wr.wr.atomic.compare_add = op->op_swap_add;
send->s_wr.wr.atomic.swap = 0;
}
- rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ nr_sig = rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.num_sge = 1;
send->s_wr.next = NULL;
send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
@@ -778,6 +806,9 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
send->s_sge[0].addr, send->s_sge[0].length);
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
failed_wr = &send->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
@@ -787,6 +818,7 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
goto out;
}
@@ -817,6 +849,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
int sent;
int ret;
int num_sge;
+ int nr_sig = 0;
/* map the op the first time we see it */
if (!op->op_mapped) {
@@ -859,7 +892,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
send->s_queued = jiffies;
send->s_op = NULL;
- rds_ib_set_wr_signal_state(ic, send, op->op_notify);
+ nr_sig += rds_ib_set_wr_signal_state(ic, send, op->op_notify);
send->s_wr.opcode = op->op_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -910,6 +943,9 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
work_alloc = i;
}
+ if (nr_sig)
+ atomic_add(nr_sig, &ic->i_signaled_sends);
+
failed_wr = &first->s_wr;
ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
@@ -919,6 +955,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
printk(KERN_WARNING "RDS/IB: rdma ib_post_send to %pI4 "
"returned %d\n", &conn->c_faddr, ret);
rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+ rds_ib_sub_signaled(ic, nr_sig);
goto out;
}