From 2d0abe36cf13fb7b577949fd1539326adddcc9bc Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:38 -0400 Subject: xprtrdma: Fix use-after-free in rpcrdma_post_recvs Dereference wr->next /before/ the memory backing wr has been released. This issue was found by code inspection. It is not expected to be a significant problem because it is in an error path that is almost never executed. Fixes: 7c8d9e7c8863 ("xprtrdma: Move Receive posting to ... ") Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 84bb37924540..e71315e9dca2 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1553,10 +1553,11 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); if (rc) { - for (wr = bad_wr; wr; wr = wr->next) { + for (wr = bad_wr; wr;) { struct rpcrdma_rep *rep; rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr); + wr = wr->next; rpcrdma_recv_buffer_put(rep); --count; } -- cgit v1.2.3 From 1310051c720a83c5717658bcbff710b260f2bff9 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:43 -0400 Subject: xprtrdma: Replace use of xdr_stream_pos in rpcrdma_marshal_req This is a latent bug. xdr_stream_pos works by subtracting xdr_stream::nwords from xdr_buf::len. But xdr_stream::nwords is not initialized by xdr_init_encode(). It works today only because all fields in rpcrdma_req::rl_stream are initialized to zero by rpcrdma_req_create, making the subtraction in xdr_stream_pos always a no-op. I found this issue via code inspection. It was introduced by commit 39f4cd9e9982 ("xprtrdma: Harden chunk list encoding against send buffer overflow"), but the code has changed enough since then that this fix can't be automatically applied to stable. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 85115a2e2639..97bfb804b6c6 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -867,12 +867,12 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) if (ret) goto out_err; - trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype); - - ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr), + ret = rpcrdma_prepare_send_sges(r_xprt, req, req->rl_hdrbuf.len, &rqst->rq_snd_buf, rtype); if (ret) goto out_err; + + trace_xprtrdma_marshal(req, rtype, wtype); return 0; out_err: -- cgit v1.2.3 From 05eb06d86685e7d9dac60e6bbb46d7f4c30b056e Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:48 -0400 Subject: xprtrdma: Fix occasional transport deadlock Under high I/O workloads, I've noticed that an RPC/RDMA transport occasionally deadlocks (IOPS goes to zero, and doesn't recover). Diagnosis shows that the sendctx queue is empty, but when sendctxs are returned to the queue, the xprt_write_space wake-up never occurs. The wake-up logic in rpcrdma_sendctx_put_locked is racy. I noticed that both EMPTY_SCQ and XPRT_WRITE_SPACE are implemented via an atomic bit. Just one of those is sufficient. Removing EMPTY_SCQ in favor of the generic bit mechanism makes the deadlock un-reproducible. Without EMPTY_SCQ, rpcrdma_buffer::rb_flags is no longer used and is therefore removed. Unfortunately this patch does not apply cleanly to stable. If needed, someone will have to port it and test it. Fixes: 2fad659209d5 ("xprtrdma: Wait on empty sendctx queue") Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 6 +++++- net/sunrpc/xprtrdma/rpc_rdma.c | 26 ++++++++++++-------------- net/sunrpc/xprtrdma/verbs.c | 11 +++-------- net/sunrpc/xprtrdma/xprt_rdma.h | 6 ------ 4 files changed, 20 insertions(+), 29 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 794ba4ca0994..ac47314fb751 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -391,7 +391,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, rpcrdma_mr_recycle(mr); mr = rpcrdma_mr_get(r_xprt); if (!mr) - return ERR_PTR(-EAGAIN); + goto out_getmr_err; } while (mr->frwr.fr_state != FRWR_IS_INVALID); frwr = &mr->frwr; frwr->fr_state = FRWR_IS_VALID; @@ -448,6 +448,10 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, *out = mr; return seg; +out_getmr_err: + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); + return ERR_PTR(-EAGAIN); + out_dmamap_err: mr->mr_dir = DMA_NONE; trace_xprtrdma_frwr_sgerr(mr, i); diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 97bfb804b6c6..59b214ba8813 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -699,22 +699,28 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, u32 hdrlen, struct xdr_buf *xdr, enum rpcrdma_chunktype rtype) { + int ret; + + ret = -EAGAIN; req->rl_sendctx = rpcrdma_sendctx_get_locked(r_xprt); if (!req->rl_sendctx) - return -EAGAIN; + goto err; req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + ret = -EIO; if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) - return -EIO; - + goto err; if (rtype != rpcrdma_areadch) if (!rpcrdma_prepare_msg_sges(r_xprt, req, xdr, rtype)) - return -EIO; - + goto err; return 0; + +err: + trace_xprtrdma_prepsend_failed(&req->rl_slot, ret); + return ret; } /** @@ -877,15 +883,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) out_err: trace_xprtrdma_marshal_failed(rqst, ret); - switch (ret) { - case -EAGAIN: - xprt_wait_for_buffer_space(rqst->rq_xprt); - break; - case -ENOBUFS: - break; - default: - r_xprt->rx_stats.failed_marshal_count++; - } + r_xprt->rx_stats.failed_marshal_count++; return ret; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index e71315e9dca2..0be5a36cacb6 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -901,7 +901,7 @@ out_emptyq: * completions recently. This is a sign the Send Queue is * backing up. Cause the caller to pause and try again. */ - set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags); + xprt_wait_for_buffer_space(&r_xprt->rx_xprt); r_xprt->rx_stats.empty_sendctx_q++; return NULL; } @@ -936,10 +936,7 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc) /* Paired with READ_ONCE */ smp_store_release(&buf->rb_sc_tail, next_tail); - if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) { - smp_mb__after_atomic(); - xprt_write_space(&sc->sc_xprt->rx_xprt); - } + xprt_write_space(&sc->sc_xprt->rx_xprt); } static void @@ -977,8 +974,6 @@ rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt) r_xprt->rx_stats.mrs_allocated += count; spin_unlock(&buf->rb_mrlock); trace_xprtrdma_createmrs(r_xprt, count); - - xprt_write_space(&r_xprt->rx_xprt); } static void @@ -990,6 +985,7 @@ rpcrdma_mr_refresh_worker(struct work_struct *work) rx_buf); rpcrdma_mrs_create(r_xprt); + xprt_write_space(&r_xprt->rx_xprt); } /** @@ -1089,7 +1085,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) struct rpcrdma_buffer *buf = &r_xprt->rx_buf; int i, rc; - buf->rb_flags = 0; buf->rb_max_requests = r_xprt->rx_ep.rep_max_requests; buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_mrlock); diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index d1e0749bcbc4..2c6c5d8c3de1 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -391,7 +391,6 @@ struct rpcrdma_buffer { struct list_head rb_recv_bufs; struct list_head rb_allreqs; - unsigned long rb_flags; u32 rb_max_requests; u32 rb_credits; /* most recent credit grant */ @@ -402,11 +401,6 @@ struct rpcrdma_buffer { struct delayed_work rb_refresh_worker; }; -/* rb_flags */ -enum { - RPCRDMA_BUF_F_EMPTY_SCQ = 0, -}; - /* * Statistics for RPCRDMA */ -- cgit v1.2.3 From 5809ea4f7c39bf38e3f85ec185b776da9d81717c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:54 -0400 Subject: xprtrdma: Remove the RPCRDMA_REQ_F_PENDING flag Commit 9590d083c1bb ("xprtrdma: Use xprt_pin_rqst in rpcrdma_reply_handler") pins incoming RPC/RDMA replies so they can be left in the pending requests queue while they are being processed without introducing a race between ->buf_free and the transport's reply handler. Therefore RPCRDMA_REQ_F_PENDING is no longer necessary. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 1 - net/sunrpc/xprtrdma/transport.c | 4 +--- net/sunrpc/xprtrdma/xprt_rdma.h | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 59b214ba8813..fbc0a9ff14b1 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1371,7 +1371,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) } req->rl_reply = rep; rep->rr_rqst = rqst; - clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); queue_work(buf->rb_completion_wq, &rep->rr_work); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 1f73a6a7e43c..f84375ddbb4d 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -618,8 +618,7 @@ xprt_rdma_free(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - if (test_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags)) - rpcrdma_release_rqst(r_xprt, req); + rpcrdma_release_rqst(r_xprt, req); trace_xprtrdma_op_free(task, req); } @@ -667,7 +666,6 @@ xprt_rdma_send_request(struct rpc_rqst *rqst) goto drop_connection; rqst->rq_xtime = ktime_get(); - __set_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags); if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req)) goto drop_connection; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 2c6c5d8c3de1..3c39aa3c113c 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -344,7 +344,6 @@ struct rpcrdma_req { /* rl_flags */ enum { - RPCRDMA_REQ_F_PENDING = 0, RPCRDMA_REQ_F_TX_RESOURCES, }; -- cgit v1.2.3 From 847568942f93e0af77e4bb8a098899f310cb3a88 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:32:59 -0400 Subject: xprtrdma: Remove fr_state Now that both the Send and Receive completions are handled in process context, it is safe to DMA unmap and return MRs to the free or recycle lists directly in the completion handlers. Doing this means rpcrdma_frwr no longer needs to track the state of each MR, meaning that a VALID or FLUSHED MR can no longer appear on an xprt's MR free list. Thus there is no longer a need to track the MR's registration state in rpcrdma_frwr. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 204 ++++++++++++++++++---------------------- net/sunrpc/xprtrdma/rpc_rdma.c | 2 +- net/sunrpc/xprtrdma/xprt_rdma.h | 11 +-- 3 files changed, 94 insertions(+), 123 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index ac47314fb751..5c480bc13075 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -168,7 +168,6 @@ int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr) goto out_list_err; mr->frwr.fr_mr = frmr; - mr->frwr.fr_state = FRWR_IS_INVALID; mr->mr_dir = DMA_NONE; INIT_LIST_HEAD(&mr->mr_list); INIT_WORK(&mr->mr_recycle, frwr_mr_recycle_worker); @@ -297,65 +296,6 @@ size_t frwr_maxpages(struct rpcrdma_xprt *r_xprt) (ia->ri_max_segs - 2) * ia->ri_max_frwr_depth); } -/** - * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - */ -static void -frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = - container_of(cqe, struct rpcrdma_frwr, fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_FR; - trace_xprtrdma_wc_fastreg(wc, frwr); -} - -/** - * frwr_wc_localinv - Invoked by RDMA provider for a flushed LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - */ -static void -frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_LI; - trace_xprtrdma_wc_li(wc, frwr); -} - -/** - * frwr_wc_localinv_wake - Invoked by RDMA provider for a signaled LocalInv WC - * @cq: completion queue (ignored) - * @wc: completed WR - * - * Awaken anyone waiting for an MR to finish being fenced. - */ -static void -frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) -{ - struct ib_cqe *cqe = wc->wr_cqe; - struct rpcrdma_frwr *frwr = container_of(cqe, struct rpcrdma_frwr, - fr_cqe); - - /* WARNING: Only wr_cqe and status are reliable at this point */ - if (wc->status != IB_WC_SUCCESS) - frwr->fr_state = FRWR_FLUSHED_LI; - trace_xprtrdma_wc_li_wake(wc, frwr); - complete(&frwr->fr_linv_done); -} - /** * frwr_map - Register a memory region * @r_xprt: controlling transport @@ -378,23 +318,15 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, { struct rpcrdma_ia *ia = &r_xprt->rx_ia; bool holes_ok = ia->ri_mrtype == IB_MR_TYPE_SG_GAPS; - struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; struct ib_mr *ibmr; struct ib_reg_wr *reg_wr; int i, n; u8 key; - mr = NULL; - do { - if (mr) - rpcrdma_mr_recycle(mr); - mr = rpcrdma_mr_get(r_xprt); - if (!mr) - goto out_getmr_err; - } while (mr->frwr.fr_state != FRWR_IS_INVALID); - frwr = &mr->frwr; - frwr->fr_state = FRWR_IS_VALID; + mr = rpcrdma_mr_get(r_xprt); + if (!mr) + goto out_getmr_err; if (nsegs > ia->ri_max_frwr_depth) nsegs = ia->ri_max_frwr_depth; @@ -423,7 +355,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, if (!mr->mr_nents) goto out_dmamap_err; - ibmr = frwr->fr_mr; + ibmr = mr->frwr.fr_mr; n = ib_map_mr_sg(ibmr, mr->mr_sg, mr->mr_nents, NULL, PAGE_SIZE); if (unlikely(n != mr->mr_nents)) goto out_mapmr_err; @@ -433,7 +365,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, key = (u8)(ibmr->rkey & 0x000000FF); ib_update_fast_reg_key(ibmr, ++key); - reg_wr = &frwr->fr_regwr; + reg_wr = &mr->frwr.fr_regwr; reg_wr->mr = ibmr; reg_wr->key = ibmr->rkey; reg_wr->access = writing ? @@ -464,6 +396,23 @@ out_mapmr_err: return ERR_PTR(-EIO); } +/** + * frwr_wc_fastreg - Invoked by RDMA provider for a flushed FastReg WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ +static void frwr_wc_fastreg(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_fastreg(wc, frwr); + /* The MR will get recycled when the associated req is retransmitted */ +} + /** * frwr_send - post Send WR containing the RPC Call message * @ia: interface adapter @@ -516,31 +465,72 @@ void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs) if (mr->mr_handle == rep->rr_inv_rkey) { list_del_init(&mr->mr_list); trace_xprtrdma_mr_remoteinv(mr); - mr->frwr.fr_state = FRWR_IS_INVALID; rpcrdma_mr_unmap_and_put(mr); break; /* only one invalidated MR per RPC */ } } +static void __frwr_release_mr(struct ib_wc *wc, struct rpcrdma_mr *mr) +{ + if (wc->status != IB_WC_SUCCESS) + rpcrdma_mr_recycle(mr); + else + rpcrdma_mr_unmap_and_put(mr); +} + /** - * frwr_unmap_sync - invalidate memory regions that were registered for @req - * @r_xprt: controlling transport - * @mrs: list of MRs to process + * frwr_wc_localinv - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * Sleeps until it is safe for the host CPU to access the - * previously mapped memory regions. + */ +static void frwr_wc_localinv(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li(wc, frwr); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_wc_localinv_wake - Invoked by RDMA provider for a LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR * - * Caller ensures that @mrs is not empty before the call. This - * function empties the list. + * Awaken anyone waiting for an MR to finish being fenced. */ -void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) +static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_wake(wc, frwr); + complete(&frwr->fr_linv_done); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_unmap_sync - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * Sleeps until it is safe for the host CPU to access the previously mapped + * memory regions. + */ +void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *first, **prev, *last; const struct ib_send_wr *bad_wr; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_frwr *frwr; struct rpcrdma_mr *mr; - int count, rc; + int rc; /* ORDER: Invalidate all of the MRs first * @@ -548,33 +538,32 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) * a single ib_post_send() call. */ frwr = NULL; - count = 0; prev = &first; - list_for_each_entry(mr, mrs, mr_list) { - mr->frwr.fr_state = FRWR_IS_INVALID; + while (!list_empty(&req->rl_registered)) { + mr = rpcrdma_mr_pop(&req->rl_registered); - frwr = &mr->frwr; trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; + frwr = &mr->frwr; frwr->fr_cqe.done = frwr_wc_localinv; last = &frwr->fr_invwr; - memset(last, 0, sizeof(*last)); + last->next = NULL; last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; last->ex.invalidate_rkey = mr->mr_handle; - count++; *prev = last; prev = &last->next; } - if (!frwr) - goto unmap; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain * are complete. */ - last->send_flags = IB_SEND_SIGNALED; frwr->fr_cqe.done = frwr_wc_localinv_wake; reinit_completion(&frwr->fr_linv_done); @@ -582,29 +571,20 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct list_head *mrs) * replaces the QP. The RPC reply handler won't call us * unless ri_id->qp is a valid pointer. */ - r_xprt->rx_stats.local_inv_needed++; bad_wr = NULL; - rc = ib_post_send(ia->ri_id->qp, first, &bad_wr); - if (bad_wr != first) - wait_for_completion(&frwr->fr_linv_done); - if (rc) - goto out_release; + rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); + trace_xprtrdma_post_send(req, rc); - /* ORDER: Now DMA unmap all of the MRs, and return - * them to the free MR list. + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so don't wait in that case. */ -unmap: - while (!list_empty(mrs)) { - mr = rpcrdma_mr_pop(mrs); - rpcrdma_mr_unmap_and_put(mr); - } - return; - -out_release: - pr_err("rpcrdma: FRWR invalidate ib_post_send returned %i\n", rc); + if (bad_wr != first) + wait_for_completion(&frwr->fr_linv_done); + if (!rc) + return; - /* Unmap and release the MRs in the LOCAL_INV WRs that did not - * get posted. + /* Recycle MRs in the LOCAL_INV chain that did not get posted. */ while (bad_wr) { frwr = container_of(bad_wr, struct rpcrdma_frwr, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index fbc0a9ff14b1..f23450b176dd 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1277,7 +1277,7 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * RPC has relinquished all its Send Queue entries. */ if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, &req->rl_registered); + frwr_unmap_sync(r_xprt, req); /* Ensure that any DMA mapped pages associated with * the Send of the RPC Call have been unmapped before diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 3c39aa3c113c..a9de116a5c1a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -240,17 +240,9 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ -enum rpcrdma_frwr_state { - FRWR_IS_INVALID, /* ready to be used */ - FRWR_IS_VALID, /* in use */ - FRWR_FLUSHED_FR, /* flushed FASTREG WR */ - FRWR_FLUSHED_LI, /* flushed LOCALINV WR */ -}; - struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; - enum rpcrdma_frwr_state fr_state; struct completion fr_linv_done; union { struct ib_reg_wr fr_regwr; @@ -567,8 +559,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr **mr); int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); -void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, - struct list_head *mrs); +void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); /* * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c -- cgit v1.2.3 From 40088f0e9b62d7fa033918b54ef45f8bf7d1ad1c Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:04 -0400 Subject: xprtrdma: Add mechanism to place MRs back on the free list When a marshal operation fails, any MRs that were already set up for that request are recycled. Recycling releases MRs and creates new ones, which is expensive. Since commit f2877623082b ("xprtrdma: Chain Send to FastReg WRs") was merged, recycling FRWRs is unnecessary. This is because before that commit, frwr_map had already posted FAST_REG Work Requests, so ownership of the MRs had already been passed to the NIC and thus dealing with them had to be delayed until they completed. Since that commit, however, FAST_REG WRs are posted at the same time as the Send WR. This means that if marshaling fails, we are certain the MRs are safe to simply unmap and place back on the free list because neither the Send nor the FAST_REG WRs have been posted yet. The kernel still has ownership of the MRs at this point. This reduces the total number of MRs that the xprt has to create under heavy workloads and makes the marshaling logic less brittle. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 20 ++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 1 + net/sunrpc/xprtrdma/xprt_rdma.h | 1 + 3 files changed, 22 insertions(+) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 5c480bc13075..524cac0a0715 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -144,6 +144,26 @@ frwr_mr_recycle_worker(struct work_struct *work) frwr_release_mr(mr); } +/* frwr_reset - Place MRs back on the free list + * @req: request to reset + * + * Used after a failed marshal. For FRWR, this means the MRs + * don't have to be fully released and recreated. + * + * NB: This is safe only as long as none of @req's MRs are + * involved with an ongoing asynchronous FAST_REG or LOCAL_INV + * Work Request. + */ +void frwr_reset(struct rpcrdma_req *req) +{ + while (!list_empty(&req->rl_registered)) { + struct rpcrdma_mr *mr; + + mr = rpcrdma_mr_pop(&req->rl_registered); + rpcrdma_mr_unmap_and_put(mr); + } +} + /** * frwr_init_mr - Initialize one MR * @ia: interface adapter diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index f23450b176dd..67d72d68ca6c 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -884,6 +884,7 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) out_err: trace_xprtrdma_marshal_failed(rqst, ret); r_xprt->rx_stats.failed_marshal_count++; + frwr_reset(req); return ret; } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a9de116a5c1a..a39652884308 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -549,6 +549,7 @@ rpcrdma_data_dir(bool writing) /* Memory registration calls xprtrdma/frwr_ops.c */ bool frwr_is_supported(struct ib_device *device); +void frwr_reset(struct rpcrdma_req *req); int frwr_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep); int frwr_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mr *mr); void frwr_release_mr(struct rpcrdma_mr *mr); -- cgit v1.2.3 From d8099feda4833bab96b1bf312e9e6aad6b771570 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:10 -0400 Subject: xprtrdma: Reduce context switching due to Local Invalidation Since commit ba69cd122ece ("xprtrdma: Remove support for FMR memory registration"), FRWR is the only supported memory registration mode. We can take advantage of the asynchronous nature of FRWR's LOCAL_INV Work Requests to get rid of the completion wait by having the LOCAL_INV completion handler take care of DMA unmapping MRs and waking the upper layer RPC waiter. This eliminates two context switches when local invalidation is necessary. As a side benefit, we will no longer need the per-xprt deferred completion work queue. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/frwr_ops.c | 103 +++++++++++++++++++++++++++++++++++++++- net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++------------ net/sunrpc/xprtrdma/verbs.c | 17 ------- net/sunrpc/xprtrdma/xprt_rdma.h | 8 ++-- 4 files changed, 136 insertions(+), 53 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 524cac0a0715..0b6dad7580a1 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -542,7 +542,10 @@ static void frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) * @req: rpcrdma_req with a non-empty list of MRs to process * * Sleeps until it is safe for the host CPU to access the previously mapped - * memory regions. + * memory regions. This guarantees that registered MRs are properly fenced + * from the server before the RPC consumer accesses the data in them. It + * also ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. */ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { @@ -616,3 +619,101 @@ void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) rpcrdma_mr_recycle(mr); } } + +/** + * frwr_wc_localinv_done - Invoked by RDMA provider for a signaled LOCAL_INV WC + * @cq: completion queue (ignored) + * @wc: completed WR + * + */ +static void frwr_wc_localinv_done(struct ib_cq *cq, struct ib_wc *wc) +{ + struct ib_cqe *cqe = wc->wr_cqe; + struct rpcrdma_frwr *frwr = + container_of(cqe, struct rpcrdma_frwr, fr_cqe); + struct rpcrdma_mr *mr = container_of(frwr, struct rpcrdma_mr, frwr); + + /* WARNING: Only wr_cqe and status are reliable at this point */ + trace_xprtrdma_wc_li_done(wc, frwr); + rpcrdma_complete_rqst(frwr->fr_req->rl_reply); + __frwr_release_mr(wc, mr); +} + +/** + * frwr_unmap_async - invalidate memory regions that were registered for @req + * @r_xprt: controlling transport instance + * @req: rpcrdma_req with a non-empty list of MRs to process + * + * This guarantees that registered MRs are properly fenced from the + * server before the RPC consumer accesses the data in them. It also + * ensures proper Send flow control: waking the next RPC waits until + * this RPC has relinquished all its Send Queue entries. + */ +void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct ib_send_wr *first, *last, **prev; + const struct ib_send_wr *bad_wr; + struct rpcrdma_frwr *frwr; + struct rpcrdma_mr *mr; + int rc; + + /* Chain the LOCAL_INV Work Requests and post them with + * a single ib_post_send() call. + */ + frwr = NULL; + prev = &first; + while (!list_empty(&req->rl_registered)) { + mr = rpcrdma_mr_pop(&req->rl_registered); + + trace_xprtrdma_mr_localinv(mr); + r_xprt->rx_stats.local_inv_needed++; + + frwr = &mr->frwr; + frwr->fr_cqe.done = frwr_wc_localinv; + frwr->fr_req = req; + last = &frwr->fr_invwr; + last->next = NULL; + last->wr_cqe = &frwr->fr_cqe; + last->sg_list = NULL; + last->num_sge = 0; + last->opcode = IB_WR_LOCAL_INV; + last->send_flags = IB_SEND_SIGNALED; + last->ex.invalidate_rkey = mr->mr_handle; + + *prev = last; + prev = &last->next; + } + + /* Strong send queue ordering guarantees that when the + * last WR in the chain completes, all WRs in the chain + * are complete. The last completion will wake up the + * RPC waiter. + */ + frwr->fr_cqe.done = frwr_wc_localinv_done; + + /* Transport disconnect drains the receive CQ before it + * replaces the QP. The RPC reply handler won't call us + * unless ri_id->qp is a valid pointer. + */ + bad_wr = NULL; + rc = ib_post_send(r_xprt->rx_ia.ri_id->qp, first, &bad_wr); + trace_xprtrdma_post_send(req, rc); + if (!rc) + return; + + /* Recycle MRs in the LOCAL_INV chain that did not get posted. + */ + while (bad_wr) { + frwr = container_of(bad_wr, struct rpcrdma_frwr, fr_invwr); + mr = container_of(frwr, struct rpcrdma_mr, frwr); + bad_wr = bad_wr->next; + + rpcrdma_mr_recycle(mr); + } + + /* The final LOCAL_INV WR in the chain is supposed to + * do the wake. If it was never posted, the wake will + * not happen, so wake here in that case. + */ + rpcrdma_complete_rqst(req->rl_reply); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 67d72d68ca6c..33b6e6a03f68 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -1268,24 +1268,15 @@ out_badheader: goto out; } -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +/* Ensure that any DMA mapped pages associated with + * the Send of the RPC Call have been unmapped before + * allowing the RPC to complete. This protects argument + * memory not controlled by the RPC client from being + * re-used before we're done with it. + */ +static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req) { - /* Invalidate and unmap the data payloads before waking - * the waiting application. This guarantees the memory - * regions are properly fenced from the server before the - * application accesses the data. It also ensures proper - * send flow control: waking the next RPC waits until this - * RPC has relinquished all its Send Queue entries. - */ - if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, req); - - /* Ensure that any DMA mapped pages associated with - * the Send of the RPC Call have been unmapped before - * allowing the RPC to complete. This protects argument - * memory not controlled by the RPC client from being - * re-used before we're done with it. - */ if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { r_xprt->rx_stats.reply_waits_for_send++; out_of_line_wait_on_bit(&req->rl_flags, @@ -1295,24 +1286,23 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) } } -/* Reply handling runs in the poll worker thread. Anything that - * might wait is deferred to a separate workqueue. +/** + * rpcrdma_release_rqst - Release hardware resources + * @r_xprt: controlling transport instance + * @req: request with resources to release + * */ -void rpcrdma_deferred_completion(struct work_struct *work) +void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct rpcrdma_rep *rep = - container_of(work, struct rpcrdma_rep, rr_work); - struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst); - struct rpcrdma_xprt *r_xprt = rep->rr_rxprt; + if (!list_empty(&req->rl_registered)) + frwr_unmap_sync(r_xprt, req); - trace_xprtrdma_defer_cmp(rep); - if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) - frwr_reminv(rep, &req->rl_registered); - rpcrdma_release_rqst(r_xprt, req); - rpcrdma_complete_rqst(rep); + rpcrdma_release_tx(r_xprt, req); } -/* Process received RPC/RDMA messages. +/** + * rpcrdma_reply_handler - Process received RPC/RDMA messages + * @rep: Incoming rpcrdma_rep object to process * * Errors must result in the RPC task either being awakened, or * allowed to timeout, to discover the errors at that time. @@ -1374,7 +1364,16 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_rqst = rqst; trace_xprtrdma_reply(rqst->rq_task, rep, req, credits); - queue_work(buf->rb_completion_wq, &rep->rr_work); + + if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) + frwr_reminv(rep, &req->rl_registered); + if (!list_empty(&req->rl_registered)) { + frwr_unmap_async(r_xprt, req); + /* LocalInv completion will complete the RPC */ + } else { + rpcrdma_release_tx(r_xprt, req); + rpcrdma_complete_rqst(rep); + } return; out_badversion: diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 0be5a36cacb6..c50a4b295bd7 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -89,14 +89,12 @@ static void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp); */ static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt) { - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; /* Flush Receives, then wait for deferred Reply work * to complete. */ ib_drain_rq(ia->ri_id->qp); - drain_workqueue(buf->rb_completion_wq); /* Deferred Reply processing might have scheduled * local invalidations. @@ -1056,7 +1054,6 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; - INIT_WORK(&rep->rr_work, rpcrdma_deferred_completion); rep->rr_recv_wr.next = NULL; rep->rr_recv_wr.wr_cqe = &rep->rr_cqe; rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; @@ -1117,15 +1114,6 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) if (rc) goto out; - buf->rb_completion_wq = alloc_workqueue("rpcrdma-%s", - WQ_MEM_RECLAIM | WQ_HIGHPRI, - 0, - r_xprt->rx_xprt.address_strings[RPC_DISPLAY_ADDR]); - if (!buf->rb_completion_wq) { - rc = -ENOMEM; - goto out; - } - return 0; out: rpcrdma_buffer_destroy(buf); @@ -1199,11 +1187,6 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { cancel_delayed_work_sync(&buf->rb_refresh_worker); - if (buf->rb_completion_wq) { - destroy_workqueue(buf->rb_completion_wq); - buf->rb_completion_wq = NULL; - } - rpcrdma_sendctxs_destroy(buf); while (!list_empty(&buf->rb_recv_bufs)) { diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a39652884308..e465221c9c96 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -202,10 +202,9 @@ struct rpcrdma_rep { bool rr_temp; struct rpcrdma_regbuf *rr_rdmabuf; struct rpcrdma_xprt *rr_rxprt; - struct work_struct rr_work; + struct rpc_rqst *rr_rqst; struct xdr_buf rr_hdrbuf; struct xdr_stream rr_stream; - struct rpc_rqst *rr_rqst; struct list_head rr_list; struct ib_recv_wr rr_recv_wr; }; @@ -240,10 +239,12 @@ struct rpcrdma_sendctx { * An external memory region is any buffer or page that is registered * on the fly (ie, not pre-registered). */ +struct rpcrdma_req; struct rpcrdma_frwr { struct ib_mr *fr_mr; struct ib_cqe fr_cqe; struct completion fr_linv_done; + struct rpcrdma_req *fr_req; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -388,7 +389,6 @@ struct rpcrdma_buffer { u32 rb_bc_srv_max_requests; u32 rb_bc_max_requests; - struct workqueue_struct *rb_completion_wq; struct delayed_work rb_refresh_worker; }; @@ -561,6 +561,7 @@ struct rpcrdma_mr_seg *frwr_map(struct rpcrdma_xprt *r_xprt, int frwr_send(struct rpcrdma_ia *ia, struct rpcrdma_req *req); void frwr_reminv(struct rpcrdma_rep *rep, struct list_head *mrs); void frwr_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); +void frwr_unmap_async(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); /* * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c @@ -585,7 +586,6 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req); -void rpcrdma_deferred_completion(struct work_struct *work); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) { -- cgit v1.2.3 From 0ab115237025f5e379620bbcd56a02697d07b002 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:15 -0400 Subject: xprtrdma: Wake RPCs directly in rpcrdma_wc_send path Eliminate a context switch in the path that handles RPC wake-ups when a Receive completion has to wait for a Send completion. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 61 ++++++++++++++++------------------------- net/sunrpc/xprtrdma/transport.c | 10 ++++++- net/sunrpc/xprtrdma/verbs.c | 3 +- net/sunrpc/xprtrdma/xprt_rdma.h | 12 ++------ 4 files changed, 36 insertions(+), 50 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 33b6e6a03f68..caf0b1950d76 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return 0; } +static void rpcrdma_sendctx_done(struct kref *kref) +{ + struct rpcrdma_req *req = + container_of(kref, struct rpcrdma_req, rl_kref); + struct rpcrdma_rep *rep = req->rl_reply; + + rpcrdma_complete_rqst(rep); + rep->rr_rxprt->rx_stats.reply_waits_for_send++; +} + /** * rpcrdma_sendctx_unmap - DMA-unmap Send buffer * @sc: sendctx containing SGEs to unmap @@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) { struct ib_sge *sge; + if (!sc->sc_unmap_count) + return; + /* The first two SGEs contain the transport header and * the inline buffer. These are always left mapped so * they can be cheaply re-used. @@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, DMA_TO_DEVICE); - if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, - &sc->sc_req->rl_flags)) - wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES); + kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done); } /* Prepare an SGE for the RPC-over-RDMA transport header. @@ -666,7 +677,7 @@ map_tail: out: sc->sc_wr.num_sge += sge_no; if (sc->sc_unmap_count) - __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + kref_get(&req->rl_kref); return true; out_regbuf: @@ -708,7 +719,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_req = req; - __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); + kref_init(&req->rl_kref); ret = -EIO; if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) @@ -1268,36 +1279,12 @@ out_badheader: goto out; } -/* Ensure that any DMA mapped pages associated with - * the Send of the RPC Call have been unmapped before - * allowing the RPC to complete. This protects argument - * memory not controlled by the RPC client from being - * re-used before we're done with it. - */ -static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req) +static void rpcrdma_reply_done(struct kref *kref) { - if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { - r_xprt->rx_stats.reply_waits_for_send++; - out_of_line_wait_on_bit(&req->rl_flags, - RPCRDMA_REQ_F_TX_RESOURCES, - bit_wait, - TASK_UNINTERRUPTIBLE); - } -} + struct rpcrdma_req *req = + container_of(kref, struct rpcrdma_req, rl_kref); -/** - * rpcrdma_release_rqst - Release hardware resources - * @r_xprt: controlling transport instance - * @req: request with resources to release - * - */ -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) -{ - if (!list_empty(&req->rl_registered)) - frwr_unmap_sync(r_xprt, req); - - rpcrdma_release_tx(r_xprt, req); + rpcrdma_complete_rqst(req->rl_reply); } /** @@ -1367,13 +1354,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) frwr_reminv(rep, &req->rl_registered); - if (!list_empty(&req->rl_registered)) { + if (!list_empty(&req->rl_registered)) frwr_unmap_async(r_xprt, req); /* LocalInv completion will complete the RPC */ - } else { - rpcrdma_release_tx(r_xprt, req); - rpcrdma_complete_rqst(rep); - } + else + kref_put(&req->rl_kref, rpcrdma_reply_done); return; out_badversion: diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index f84375ddbb4d..9575f1d8db07 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -618,8 +618,16 @@ xprt_rdma_free(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - rpcrdma_release_rqst(r_xprt, req); trace_xprtrdma_op_free(task, req); + + if (!list_empty(&req->rl_registered)) + frwr_unmap_sync(r_xprt, req); + + /* XXX: If the RPC is completing because of a signal and + * not because a reply was received, we ought to ensure + * that the Send completion has fired, so that memory + * involved with the Send is not still visible to the NIC. + */ } /** diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index c50a4b295bd7..4e22cc244149 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1462,8 +1462,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; int rc; - if (!ep->rep_send_count || - test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { + if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) { send_wr->send_flags |= IB_SEND_SIGNALED; ep->rep_send_count = ep->rep_send_batch; } else { diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e465221c9c96..5475f0dff22a 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -44,7 +44,8 @@ #include /* wait_queue_head_t, etc */ #include /* spinlock_t, etc */ -#include /* atomic_t, etc */ +#include /* atomic_t, etc */ +#include /* struct kref */ #include /* struct work_struct */ #include /* RDMA connection api */ @@ -329,17 +330,12 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ struct list_head rl_all; - unsigned long rl_flags; + struct kref rl_kref; struct list_head rl_registered; /* registered segments */ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; -/* rl_flags */ -enum { - RPCRDMA_REQ_F_TX_RESOURCES, -}; - static inline struct rpcrdma_req * rpcr_to_rdmar(const struct rpc_rqst *rqst) { @@ -584,8 +580,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep); -void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, - struct rpcrdma_req *req); static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) { -- cgit v1.2.3 From 379d1bc5be373c920bcda16b9894ae99505ea127 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:20 -0400 Subject: xprtrdma: Simplify rpcrdma_rep_create Clean up. Commit 7c8d9e7c8863 ("xprtrdma: Move Receive posting to Receive handler") reduced the number of rpcrdma_rep_create call sites to one. After that commit, the backchannel code no longer invokes it. Therefore the free list logic added by commit d698c4a02ee0 ("xprtrdma: Fix backchannel allocation of extra rpcrdma_reps") is no longer necessary, and in fact adds some extra overhead that we can do without. Simply post any newly created reps. They will get added back to the rb_recv_bufs list when they subsequently complete. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 4e22cc244149..de6be101abf2 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1036,9 +1036,9 @@ out1: return NULL; } -static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) +static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, + bool temp) { - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_rep *rep; rep = kzalloc(sizeof(*rep), GFP_KERNEL); @@ -1049,9 +1049,9 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) DMA_FROM_DEVICE, GFP_KERNEL); if (!rep->rr_rdmabuf) goto out_free; + xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf), rdmab_length(rep->rr_rdmabuf)); - rep->rr_cqe.done = rpcrdma_wc_receive; rep->rr_rxprt = r_xprt; rep->rr_recv_wr.next = NULL; @@ -1059,16 +1059,12 @@ static bool rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt, bool temp) rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov; rep->rr_recv_wr.num_sge = 1; rep->rr_temp = temp; - - spin_lock(&buf->rb_lock); - list_add(&rep->rr_list, &buf->rb_recv_bufs); - spin_unlock(&buf->rb_lock); - return true; + return rep; out_free: kfree(rep); out: - return false; + return NULL; } /** @@ -1497,7 +1493,6 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) count = 0; wr = NULL; while (needed) { - struct rpcrdma_regbuf *rb; struct rpcrdma_rep *rep; spin_lock(&buf->rb_lock); @@ -1507,13 +1502,12 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) list_del(&rep->rr_list); spin_unlock(&buf->rb_lock); if (!rep) { - if (!rpcrdma_rep_create(r_xprt, temp)) + rep = rpcrdma_rep_create(r_xprt, temp); + if (!rep) break; - continue; } - rb = rep->rr_rdmabuf; - if (!rpcrdma_regbuf_dma_map(r_xprt, rb)) { + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) { rpcrdma_recv_buffer_put(rep); break; } -- cgit v1.2.3 From 9ef33ef5b628037b694a433f8af014a04bb38126 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:26 -0400 Subject: xprtrdma: Streamline rpcrdma_post_recvs rb_lock is contended between rpcrdma_buffer_create, rpcrdma_buffer_put, and rpcrdma_post_recvs. Commit e340c2d6ef2a ("xprtrdma: Reduce the doorbell rate (Receive)") causes rpcrdma_post_recvs to take the rb_lock repeatedly when it determines more Receives are needed. Streamline this code path so it takes the lock just once in most cases to build the Receive chain that is about to be posted. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/verbs.c | 59 +++++++++++++++++++++++++++++---------------- 1 file changed, 38 insertions(+), 21 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index de6be101abf2..3270c8ad1819 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1478,11 +1478,13 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ep *ep = &r_xprt->rx_ep; - struct ib_recv_wr *wr, *bad_wr; + struct ib_recv_wr *i, *wr, *bad_wr; + struct rpcrdma_rep *rep; int needed, count, rc; rc = 0; count = 0; + needed = buf->rb_credits + (buf->rb_bc_srv_max_requests << 1); if (ep->rep_receive_count > needed) goto out; @@ -1490,39 +1492,48 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) if (!temp) needed += RPCRDMA_MAX_RECV_BATCH; - count = 0; + /* fast path: all needed reps can be found on the free list */ wr = NULL; + spin_lock(&buf->rb_lock); while (needed) { - struct rpcrdma_rep *rep; - - spin_lock(&buf->rb_lock); rep = list_first_entry_or_null(&buf->rb_recv_bufs, struct rpcrdma_rep, rr_list); - if (likely(rep)) - list_del(&rep->rr_list); - spin_unlock(&buf->rb_lock); - if (!rep) { - rep = rpcrdma_rep_create(r_xprt, temp); - if (!rep) - break; - } + if (!rep) + break; - if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) { - rpcrdma_recv_buffer_put(rep); + list_del(&rep->rr_list); + rep->rr_recv_wr.next = wr; + wr = &rep->rr_recv_wr; + --needed; + } + spin_unlock(&buf->rb_lock); + + while (needed) { + rep = rpcrdma_rep_create(r_xprt, temp); + if (!rep) break; - } - trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); rep->rr_recv_wr.next = wr; wr = &rep->rr_recv_wr; - ++count; --needed; } - if (!count) + if (!wr) goto out; + for (i = wr; i; i = i->next) { + rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); + + if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) + goto release_wrs; + + trace_xprtrdma_post_recv(rep->rr_recv_wr.wr_cqe); + ++count; + } + rc = ib_post_recv(r_xprt->rx_ia.ri_id->qp, wr, (const struct ib_recv_wr **)&bad_wr); +out: + trace_xprtrdma_post_recvs(r_xprt, count, rc); if (rc) { for (wr = bad_wr; wr;) { struct rpcrdma_rep *rep; @@ -1534,6 +1545,12 @@ rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp) } } ep->rep_receive_count += count; -out: - trace_xprtrdma_post_recvs(r_xprt, count, rc); + return; + +release_wrs: + for (i = wr; i;) { + rep = container_of(i, struct rpcrdma_rep, rr_recv_wr); + i = i->next; + rpcrdma_recv_buffer_put(rep); + } } -- cgit v1.2.3 From 6a6c6def42469ce08023458ba439a8207fe87ae4 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:31 -0400 Subject: xprtrdma: Refactor chunk encoding Clean up. Move the "not present" case into the individual chunk encoders. This improves code organization and readability. The reason for the original organization was to optimize for the case where there there are no chunks. The optimization turned out to be inconsequential, so let's err on the side of code readability. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/rpc_rdma.c | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index caf0b1950d76..d3515d3efe81 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -366,6 +366,9 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, unsigned int pos; int nsegs; + if (rtype == rpcrdma_noch) + goto done; + pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; @@ -389,7 +392,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nsegs -= mr->mr_nents; } while (nsegs); - return 0; +done: + return encode_item_not_present(xdr); } /* Register and XDR encode the Write list. Supports encoding a list @@ -417,6 +421,9 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, int nsegs, nchunks; __be32 *segcount; + if (wtype != rpcrdma_writech) + goto done; + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, @@ -451,7 +458,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); - return 0; +done: + return encode_item_not_present(xdr); } /* Register and XDR encode the Reply chunk. Supports encoding an array @@ -476,6 +484,9 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, int nsegs, nchunks; __be32 *segcount; + if (wtype != rpcrdma_replych) + return encode_item_not_present(xdr); + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) @@ -859,28 +870,13 @@ rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time. */ - if (rtype != rpcrdma_noch) { - ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); - if (ret) - goto out_err; - } - ret = encode_item_not_present(xdr); + ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype); if (ret) goto out_err; - - if (wtype == rpcrdma_writech) { - ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); - if (ret) - goto out_err; - } - ret = encode_item_not_present(xdr); + ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype); if (ret) goto out_err; - - if (wtype != rpcrdma_replych) - ret = encode_item_not_present(xdr); - else - ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); + ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype); if (ret) goto out_err; -- cgit v1.2.3 From 5828cebad1c8d535f3c194439e394e92a2273fb2 Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:36 -0400 Subject: xprtrdma: Remove rpcrdma_req::rl_buffer Clean up. There is only one remaining function, rpcrdma_buffer_put(), that uses this field. Its caller can supply a pointer to the correct rpcrdma_buffer, enabling the removal of an 8-byte pointer field from a frequently-allocated shared data structure. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/xprtrdma/transport.c | 5 ++++- net/sunrpc/xprtrdma/verbs.c | 6 ++---- net/sunrpc/xprtrdma/xprt_rdma.h | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) (limited to 'net') diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 9575f1d8db07..3688e0782587 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -550,8 +550,11 @@ out_sleep: static void xprt_rdma_free_slot(struct rpc_xprt *xprt, struct rpc_rqst *rqst) { + struct rpcrdma_xprt *r_xprt = + container_of(xprt, struct rpcrdma_xprt, rx_xprt); + memset(rqst, 0, sizeof(*rqst)); - rpcrdma_buffer_put(rpcr_to_rdmar(rqst)); + rpcrdma_buffer_put(&r_xprt->rx_buf, rpcr_to_rdmar(rqst)); rpc_wake_up_next(&xprt->backlog); } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 3270c8ad1819..805b1f35e1ca 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -1019,7 +1019,6 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size, if (!req->rl_recvbuf) goto out4; - req->rl_buffer = buffer; INIT_LIST_HEAD(&req->rl_registered); spin_lock(&buffer->rb_lock); list_add(&req->rl_all, &buffer->rb_allreqs); @@ -1299,13 +1298,12 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) /** * rpcrdma_buffer_put - Put request/reply buffers back into pool + * @buffers: buffer pool * @req: object to return * */ -void -rpcrdma_buffer_put(struct rpcrdma_req *req) +void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req) { - struct rpcrdma_buffer *buffers = req->rl_buffer; struct rpcrdma_rep *rep = req->rl_reply; req->rl_reply = NULL; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 5475f0dff22a..117e32816e4f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -320,7 +320,6 @@ struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_list; struct rpc_rqst rl_slot; - struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply; struct xdr_stream rl_stream; struct xdr_buf rl_hdrbuf; @@ -499,7 +498,8 @@ rpcrdma_mr_recycle(struct rpcrdma_mr *mr) } struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *); -void rpcrdma_buffer_put(struct rpcrdma_req *); +void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, + struct rpcrdma_req *req); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, -- cgit v1.2.3 From 675dd90ad0932f2c03912a5252458d792bd7033a Mon Sep 17 00:00:00 2001 From: Chuck Lever Date: Wed, 19 Jun 2019 10:33:42 -0400 Subject: xprtrdma: Modernize ops->connect Adapt and apply changes that were made to the TCP socket connect code. See the following commits for details on the purpose of these changes: Commit 7196dbb02ea0 ("SUNRPC: Allow changing of the TCP timeout parameters on the fly") Commit 3851f1cdb2b8 ("SUNRPC: Limit the reconnect backoff timer to the max RPC message timeout") Commit 02910177aede ("SUNRPC: Fix reconnection timeouts") Some common transport code is moved to xprt.c to satisfy the code duplication police. Signed-off-by: Chuck Lever Signed-off-by: Anna Schumaker --- net/sunrpc/sched.c | 1 + net/sunrpc/xprt.c | 32 ++++++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 66 +++++++++++++++++++++++++++++++---------- net/sunrpc/xprtrdma/xprt_rdma.h | 1 + net/sunrpc/xprtsock.c | 23 ++------------ 5 files changed, 87 insertions(+), 36 deletions(-) (limited to 'net') diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index bb04ae52803a..5ad5dead7bfc 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -58,6 +58,7 @@ static struct rpc_wait_queue delay_queue; */ struct workqueue_struct *rpciod_workqueue __read_mostly; struct workqueue_struct *xprtiod_workqueue __read_mostly; +EXPORT_SYMBOL_GPL(xprtiod_workqueue); unsigned long rpc_task_timeout(const struct rpc_task *task) diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index ad21880d5601..b1f54b7ccc0c 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -850,6 +850,38 @@ void xprt_connect(struct rpc_task *task) xprt_release_write(xprt, task); } +/** + * xprt_reconnect_delay - compute the wait before scheduling a connect + * @xprt: transport instance + * + */ +unsigned long xprt_reconnect_delay(const struct rpc_xprt *xprt) +{ + unsigned long start, now = jiffies; + + start = xprt->stat.connect_start + xprt->reestablish_timeout; + if (time_after(start, now)) + return start - now; + return 0; +} +EXPORT_SYMBOL_GPL(xprt_reconnect_delay); + +/** + * xprt_reconnect_backoff - compute the new re-establish timeout + * @xprt: transport instance + * @init_to: initial reestablish timeout + * + */ +void xprt_reconnect_backoff(struct rpc_xprt *xprt, unsigned long init_to) +{ + xprt->reestablish_timeout <<= 1; + if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) + xprt->reestablish_timeout = xprt->max_reconnect_timeout; + if (xprt->reestablish_timeout < init_to) + xprt->reestablish_timeout = init_to; +} +EXPORT_SYMBOL_GPL(xprt_reconnect_backoff); + enum xprt_xid_rb_cmp { XID_RB_EQUAL, XID_RB_LEFT, diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 3688e0782587..4993aa49ecbe 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -298,6 +298,7 @@ xprt_rdma_destroy(struct rpc_xprt *xprt) module_put(THIS_MODULE); } +/* 60 second timeout, no retries */ static const struct rpc_timeout xprt_rdma_default_timeout = { .to_initval = 60 * HZ, .to_maxval = 60 * HZ, @@ -323,8 +324,9 @@ xprt_setup_rdma(struct xprt_create *args) if (!xprt) return ERR_PTR(-ENOMEM); - /* 60 second timeout, no retries */ xprt->timeout = &xprt_rdma_default_timeout; + xprt->connect_timeout = xprt->timeout->to_initval; + xprt->max_reconnect_timeout = xprt->timeout->to_maxval; xprt->bind_timeout = RPCRDMA_BIND_TO; xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; @@ -487,31 +489,64 @@ xprt_rdma_timer(struct rpc_xprt *xprt, struct rpc_task *task) } /** - * xprt_rdma_connect - try to establish a transport connection + * xprt_rdma_set_connect_timeout - set timeouts for establishing a connection + * @xprt: controlling transport instance + * @connect_timeout: reconnect timeout after client disconnects + * @reconnect_timeout: reconnect timeout after server disconnects + * + */ +static void xprt_rdma_tcp_set_connect_timeout(struct rpc_xprt *xprt, + unsigned long connect_timeout, + unsigned long reconnect_timeout) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + + trace_xprtrdma_op_set_cto(r_xprt, connect_timeout, reconnect_timeout); + + spin_lock(&xprt->transport_lock); + + if (connect_timeout < xprt->connect_timeout) { + struct rpc_timeout to; + unsigned long initval; + + to = *xprt->timeout; + initval = connect_timeout; + if (initval < RPCRDMA_INIT_REEST_TO << 1) + initval = RPCRDMA_INIT_REEST_TO << 1; + to.to_initval = initval; + to.to_maxval = initval; + r_xprt->rx_timeout = to; + xprt->timeout = &r_xprt->rx_timeout; + xprt->connect_timeout = connect_timeout; + } + + if (reconnect_timeout < xprt->max_reconnect_timeout) + xprt->max_reconnect_timeout = reconnect_timeout; + + spin_unlock(&xprt->transport_lock); +} + +/** + * xprt_rdma_connect - schedule an attempt to reconnect * @xprt: transport state - * @task: RPC scheduler context + * @task: RPC scheduler context (unused) * */ static void xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + unsigned long delay; trace_xprtrdma_op_connect(r_xprt); + + delay = 0; if (r_xprt->rx_ep.rep_connected != 0) { - /* Reconnect */ - schedule_delayed_work(&r_xprt->rx_connect_worker, - xprt->reestablish_timeout); - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO) - xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO; - else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO) - xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; - } else { - schedule_delayed_work(&r_xprt->rx_connect_worker, 0); - if (!RPC_IS_ASYNC(task)) - flush_delayed_work(&r_xprt->rx_connect_worker); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, RPCRDMA_INIT_REEST_TO); } + queue_delayed_work(xprtiod_workqueue, &r_xprt->rx_connect_worker, + delay); } /** @@ -769,6 +804,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { .send_request = xprt_rdma_send_request, .close = xprt_rdma_close, .destroy = xprt_rdma_destroy, + .set_connect_timeout = xprt_rdma_tcp_set_connect_timeout, .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 117e32816e4f..8378f45d2da7 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -432,6 +432,7 @@ struct rpcrdma_xprt { struct rpcrdma_ep rx_ep; struct rpcrdma_buffer rx_buf; struct delayed_work rx_connect_worker; + struct rpc_timeout rx_timeout; struct rpcrdma_stats rx_stats; }; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index c69951ed2ebc..b154600085d6 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -2402,25 +2402,6 @@ out: xprt_wake_pending_tasks(xprt, status); } -static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt) -{ - unsigned long start, now = jiffies; - - start = xprt->stat.connect_start + xprt->reestablish_timeout; - if (time_after(start, now)) - return start - now; - return 0; -} - -static void xs_reconnect_backoff(struct rpc_xprt *xprt) -{ - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) - xprt->reestablish_timeout = xprt->max_reconnect_timeout; - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; -} - /** * xs_connect - connect a socket to a remote endpoint * @xprt: pointer to transport structure @@ -2450,8 +2431,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) /* Start by resetting any existing state */ xs_reset_transport(transport); - delay = xs_reconnect_delay(xprt); - xs_reconnect_backoff(xprt); + delay = xprt_reconnect_delay(xprt); + xprt_reconnect_backoff(xprt, XS_TCP_INIT_REEST_TO); } else dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); -- cgit v1.2.3