summaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c7
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c13
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c53
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c32
-rw-r--r--net/sunrpc/xprtrdma/transport.c43
-rw-r--r--net/sunrpc/xprtrdma/verbs.c44
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h4
7 files changed, 111 insertions, 85 deletions
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ed1a4a3065ee..47ebac949769 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -44,13 +44,6 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
if (IS_ERR(req))
return PTR_ERR(req);
- rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
- DMA_TO_DEVICE, GFP_KERNEL);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_rdmabuf = rb;
- xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
-
size = r_xprt->rx_data.inline_rsize;
rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
if (IS_ERR(rb))
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index d5f95bb39300..5cc68a824f45 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -191,7 +191,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
- return ERR_PTR(-ENOBUFS);
+ return ERR_PTR(-EAGAIN);
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
@@ -251,6 +251,16 @@ out_maperr:
return ERR_PTR(-EIO);
}
+/* Post Send WR containing the RPC Call message.
+ */
+static int
+fmr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ struct ib_send_wr *bad_wr;
+
+ return ib_post_send(ia->ri_id->qp, &req->rl_sendctx->sc_wr, &bad_wr);
+}
+
/* Invalidate all memory regions that were registered for "req".
*
* Sleeps until it is safe for the host CPU to access the
@@ -305,6 +315,7 @@ out_reset:
const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_map = fmr_op_map,
+ .ro_send = fmr_op_send,
.ro_unmap_sync = fmr_op_unmap_sync,
.ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 90f688f19783..c5743a0960be 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -357,8 +357,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
struct rpcrdma_mr *mr;
struct ib_mr *ibmr;
struct ib_reg_wr *reg_wr;
- struct ib_send_wr *bad_wr;
- int rc, i, n;
+ int i, n;
u8 key;
mr = NULL;
@@ -367,7 +366,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
rpcrdma_mr_defer_recovery(mr);
mr = rpcrdma_mr_get(r_xprt);
if (!mr)
- return ERR_PTR(-ENOBUFS);
+ return ERR_PTR(-EAGAIN);
} while (mr->frwr.fr_state != FRWR_IS_INVALID);
frwr = &mr->frwr;
frwr->fr_state = FRWR_IS_VALID;
@@ -407,22 +406,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
ib_update_fast_reg_key(ibmr, ++key);
reg_wr = &frwr->fr_regwr;
- reg_wr->wr.next = NULL;
- reg_wr->wr.opcode = IB_WR_REG_MR;
- frwr->fr_cqe.done = frwr_wc_fastreg;
- reg_wr->wr.wr_cqe = &frwr->fr_cqe;
- reg_wr->wr.num_sge = 0;
- reg_wr->wr.send_flags = 0;
reg_wr->mr = ibmr;
reg_wr->key = ibmr->rkey;
reg_wr->access = writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
- rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
- if (rc)
- goto out_senderr;
-
mr->mr_handle = ibmr->rkey;
mr->mr_length = ibmr->length;
mr->mr_offset = ibmr->iova;
@@ -442,11 +431,40 @@ out_mapmr_err:
frwr->fr_mr, n, mr->mr_nents);
rpcrdma_mr_defer_recovery(mr);
return ERR_PTR(-EIO);
+}
-out_senderr:
- pr_err("rpcrdma: FRWR registration ib_post_send returned %i\n", rc);
- rpcrdma_mr_defer_recovery(mr);
- return ERR_PTR(-ENOTCONN);
+/* Post Send WR containing the RPC Call message.
+ *
+ * For FRMR, chain any FastReg WRs to the Send WR. Only a
+ * single ib_post_send call is needed to register memory
+ * and then post the Send WR.
+ */
+static int
+frwr_op_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ struct ib_send_wr *post_wr, *bad_wr;
+ struct rpcrdma_mr *mr;
+
+ post_wr = &req->rl_sendctx->sc_wr;
+ list_for_each_entry(mr, &req->rl_registered, mr_list) {
+ struct rpcrdma_frwr *frwr;
+
+ frwr = &mr->frwr;
+
+ frwr->fr_cqe.done = frwr_wc_fastreg;
+ frwr->fr_regwr.wr.next = post_wr;
+ frwr->fr_regwr.wr.wr_cqe = &frwr->fr_cqe;
+ frwr->fr_regwr.wr.num_sge = 0;
+ frwr->fr_regwr.wr.opcode = IB_WR_REG_MR;
+ frwr->fr_regwr.wr.send_flags = 0;
+
+ post_wr = &frwr->fr_regwr.wr;
+ }
+
+ /* If ib_post_send fails, the next ->send_request for
+ * @req will queue these MWs for recovery.
+ */
+ return ib_post_send(ia->ri_id->qp, post_wr, &bad_wr);
}
/* Handle a remotely invalidated mr on the @mrs list
@@ -561,6 +579,7 @@ reset_mrs:
const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_map = frwr_op_map,
+ .ro_send = frwr_op_send,
.ro_reminv = frwr_op_reminv,
.ro_unmap_sync = frwr_op_unmap_sync,
.ro_recover_mr = frwr_op_recover_mr,
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index f0855a959a27..e8adad33d0bb 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -365,7 +365,7 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
false, &mr);
if (IS_ERR(seg))
- return PTR_ERR(seg);
+ goto out_maperr;
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_read_segment(xdr, mr, pos) < 0)
@@ -377,6 +377,11 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
} while (nsegs);
return 0;
+
+out_maperr:
+ if (PTR_ERR(seg) == -EAGAIN)
+ xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+ return PTR_ERR(seg);
}
/* Register and XDR encode the Write list. Supports encoding a list
@@ -423,7 +428,7 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
true, &mr);
if (IS_ERR(seg))
- return PTR_ERR(seg);
+ goto out_maperr;
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
@@ -440,6 +445,11 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
*segcount = cpu_to_be32(nchunks);
return 0;
+
+out_maperr:
+ if (PTR_ERR(seg) == -EAGAIN)
+ xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+ return PTR_ERR(seg);
}
/* Register and XDR encode the Reply chunk. Supports encoding an array
@@ -481,7 +491,7 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
true, &mr);
if (IS_ERR(seg))
- return PTR_ERR(seg);
+ goto out_maperr;
rpcrdma_mr_push(mr, &req->rl_registered);
if (encode_rdma_segment(xdr, mr) < 0)
@@ -498,6 +508,11 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
*segcount = cpu_to_be32(nchunks);
return 0;
+
+out_maperr:
+ if (PTR_ERR(seg) == -EAGAIN)
+ xprt_wait_for_buffer_space(rqst->rq_task, NULL);
+ return PTR_ERR(seg);
}
/**
@@ -724,8 +739,8 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
* Returns:
* %0 if the RPC was sent successfully,
* %-ENOTCONN if the connection was lost,
- * %-EAGAIN if not enough pages are available for on-demand reply buffer,
- * %-ENOBUFS if no MRs are available to register chunks,
+ * %-EAGAIN if the caller should call again with the same arguments,
+ * %-ENOBUFS if the caller should call again after a delay,
* %-EMSGSIZE if the transport header is too small,
* %-EIO if a permanent problem occurred while marshaling.
*/
@@ -868,10 +883,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
return 0;
out_err:
- if (ret != -ENOBUFS) {
- pr_err("rpcrdma: header marshaling failed (%d)\n", ret);
- r_xprt->rx_stats.failed_marshal_count++;
- }
+ r_xprt->rx_stats.failed_marshal_count++;
return ret;
}
@@ -1366,7 +1378,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
- queue_work_on(req->rl_cpu, rpcrdma_receive_wq, &rep->rr_work);
+ queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
out_badstatus:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 4b1ecfe979cf..cc1aad325496 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -52,7 +52,6 @@
#include <linux/slab.h>
#include <linux/seq_file.h>
#include <linux/sunrpc/addr.h>
-#include <linux/smp.h>
#include "xprt_rdma.h"
@@ -237,8 +236,6 @@ rpcrdma_connect_worker(struct work_struct *work)
struct rpc_xprt *xprt = &r_xprt->rx_xprt;
spin_lock_bh(&xprt->transport_lock);
- if (++xprt->connect_cookie == 0) /* maintain a reserved value */
- ++xprt->connect_cookie;
if (ep->rep_connected > 0) {
if (!xprt_test_and_set_connected(xprt))
xprt_wake_pending_tasks(xprt, 0);
@@ -540,29 +537,6 @@ xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
}
}
-/* Allocate a fixed-size buffer in which to construct and send the
- * RPC-over-RDMA header for this request.
- */
-static bool
-rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
- gfp_t flags)
-{
- size_t size = RPCRDMA_HDRBUF_SIZE;
- struct rpcrdma_regbuf *rb;
-
- if (req->rl_rdmabuf)
- return true;
-
- rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, flags);
- if (IS_ERR(rb))
- return false;
-
- r_xprt->rx_stats.hardway_register_count += size;
- req->rl_rdmabuf = rb;
- xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
- return true;
-}
-
static bool
rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
size_t size, gfp_t flags)
@@ -644,15 +618,11 @@ xprt_rdma_allocate(struct rpc_task *task)
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
- if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
- goto out_fail;
if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
goto out_fail;
if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
goto out_fail;
- req->rl_cpu = smp_processor_id();
- req->rl_connect_cookie = 0; /* our reserved value */
rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
@@ -694,7 +664,8 @@ xprt_rdma_free(struct rpc_task *task)
* Returns:
* %0 if the RPC message has been sent
* %-ENOTCONN if the caller should reconnect and call again
- * %-ENOBUFS if the caller should call again later
+ * %-EAGAIN if the caller should call again
+ * %-ENOBUFS if the caller should call again after a delay
* %-EIO if a permanent error occurred and the request was not
* sent. Do not try to send this message again.
*/
@@ -723,9 +694,9 @@ xprt_rdma_send_request(struct rpc_task *task)
rpcrdma_recv_buffer_get(req);
/* Must suppress retransmit to maintain credits */
- if (req->rl_connect_cookie == xprt->connect_cookie)
+ if (rqst->rq_connect_cookie == xprt->connect_cookie)
goto drop_connection;
- req->rl_connect_cookie = xprt->connect_cookie;
+ rqst->rq_xtime = ktime_get();
__set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
@@ -733,6 +704,12 @@ xprt_rdma_send_request(struct rpc_task *task)
rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
rqst->rq_bytes_sent = 0;
+
+ /* An RPC with no reply will throw off credit accounting,
+ * so drop the connection to reset the credit grant.
+ */
+ if (!rpc_reply_expected(task))
+ goto drop_connection;
return 0;
failed_marshal:
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index e6f84a6434a0..fe5eaca2d197 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -250,11 +250,11 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
wait_for_completion(&ia->ri_remove_done);
ia->ri_id = NULL;
- ia->ri_pd = NULL;
ia->ri_device = NULL;
/* Return 1 to ensure the core destroys the id. */
return 1;
case RDMA_CM_EVENT_ESTABLISHED:
+ ++xprt->rx_xprt.connect_cookie;
connstate = 1;
rpcrdma_update_connect_private(xprt, &event->param.conn);
goto connected;
@@ -273,6 +273,7 @@ rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
connstate = -EAGAIN;
goto connected;
case RDMA_CM_EVENT_DISCONNECTED:
+ ++xprt->rx_xprt.connect_cookie;
connstate = -ECONNABORTED;
connected:
xprt->rx_buf.rb_credits = 1;
@@ -445,7 +446,9 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
ia->ri_id->qp = NULL;
}
ib_free_cq(ep->rep_attr.recv_cq);
+ ep->rep_attr.recv_cq = NULL;
ib_free_cq(ep->rep_attr.send_cq);
+ ep->rep_attr.send_cq = NULL;
/* The ULP is responsible for ensuring all DMA
* mappings and MRs are gone.
@@ -458,6 +461,8 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
rpcrdma_dma_unmap_regbuf(req->rl_recvbuf);
}
rpcrdma_mrs_destroy(buf);
+ ib_dealloc_pd(ia->ri_pd);
+ ia->ri_pd = NULL;
/* Allow waiters to continue */
complete(&ia->ri_remove_done);
@@ -589,11 +594,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
/* Client offers RDMA Read but does not initiate */
ep->rep_remote_cma.initiator_depth = 0;
- if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */
- ep->rep_remote_cma.responder_resources = 32;
- else
- ep->rep_remote_cma.responder_resources =
- ia->ri_device->attrs.max_qp_rd_atom;
+ ep->rep_remote_cma.responder_resources =
+ min_t(int, U8_MAX, ia->ri_device->attrs.max_qp_rd_atom);
/* Limit transport retries so client can detect server
* GID changes quickly. RPC layer handles re-establishing
@@ -628,14 +630,16 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
cancel_delayed_work_sync(&ep->rep_connect_worker);
- if (ia->ri_id->qp) {
+ if (ia->ri_id && ia->ri_id->qp) {
rpcrdma_ep_disconnect(ep, ia);
rdma_destroy_qp(ia->ri_id);
ia->ri_id->qp = NULL;
}
- ib_free_cq(ep->rep_attr.recv_cq);
- ib_free_cq(ep->rep_attr.send_cq);
+ if (ep->rep_attr.recv_cq)
+ ib_free_cq(ep->rep_attr.recv_cq);
+ if (ep->rep_attr.send_cq)
+ ib_free_cq(ep->rep_attr.send_cq);
}
/* Re-establish a connection after a device removal event.
@@ -1024,7 +1028,7 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
LIST_HEAD(free);
LIST_HEAD(all);
- for (count = 0; count < 32; count++) {
+ for (count = 0; count < 3; count++) {
struct rpcrdma_mr *mr;
int rc;
@@ -1049,8 +1053,9 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
list_splice(&all, &buf->rb_all);
r_xprt->rx_stats.mrs_allocated += count;
spin_unlock(&buf->rb_mrlock);
-
trace_xprtrdma_createmrs(r_xprt, count);
+
+ xprt_write_space(&r_xprt->rx_xprt);
}
static void
@@ -1068,17 +1073,27 @@ struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+ struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
+ DMA_TO_DEVICE, GFP_KERNEL);
+ if (IS_ERR(rb)) {
+ kfree(req);
+ return ERR_PTR(-ENOMEM);
+ }
+ req->rl_rdmabuf = rb;
+ xdr_buf_init(&req->rl_hdrbuf, rb->rg_base, rdmab_length(rb));
+ req->rl_buffer = buffer;
+ INIT_LIST_HEAD(&req->rl_registered);
+
spin_lock(&buffer->rb_reqslock);
list_add(&req->rl_all, &buffer->rb_allreqs);
spin_unlock(&buffer->rb_reqslock);
- req->rl_buffer = &r_xprt->rx_buf;
- INIT_LIST_HEAD(&req->rl_registered);
return req;
}
@@ -1535,7 +1550,6 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct rpcrdma_req *req)
{
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
- struct ib_send_wr *send_wr_fail;
int rc;
if (req->rl_reply) {
@@ -1554,7 +1568,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
--ep->rep_send_count;
}
- rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
+ rc = ia->ri_ops->ro_send(ia, req);
trace_xprtrdma_post_send(req, rc);
if (rc)
return -ENOTCONN;
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 69883a960a3f..3d3b423fa9c1 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -334,8 +334,6 @@ enum {
struct rpcrdma_buffer;
struct rpcrdma_req {
struct list_head rl_list;
- int rl_cpu;
- unsigned int rl_connect_cookie;
struct rpcrdma_buffer *rl_buffer;
struct rpcrdma_rep *rl_reply;
struct xdr_stream rl_stream;
@@ -474,6 +472,8 @@ struct rpcrdma_memreg_ops {
(*ro_map)(struct rpcrdma_xprt *,
struct rpcrdma_mr_seg *, int, bool,
struct rpcrdma_mr **);
+ int (*ro_send)(struct rpcrdma_ia *ia,
+ struct rpcrdma_req *req);
void (*ro_reminv)(struct rpcrdma_rep *rep,
struct list_head *mrs);
void (*ro_unmap_sync)(struct rpcrdma_xprt *,