diff options
49 files changed, 1764 insertions, 899 deletions
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c index 618ced381a14..aaa2e8d3df6f 100644 --- a/fs/nfs/callback_proc.c +++ b/fs/nfs/callback_proc.c @@ -217,7 +217,8 @@ static u32 initiate_file_draining(struct nfs_client *clp, } if (pnfs_mark_matching_lsegs_return(lo, &free_me_list, - &args->cbl_range)) { + &args->cbl_range, + be32_to_cpu(args->cbl_stateid.seqid))) { rv = NFS4_OK; goto unlock; } @@ -500,8 +501,10 @@ __be32 nfs4_callback_sequence(struct cb_sequenceargs *args, cps->slot = slot; /* The ca_maxresponsesize_cached is 0 with no DRC */ - if (args->csa_cachethis != 0) - return htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE); + if (args->csa_cachethis != 0) { + status = htonl(NFS4ERR_REP_TOO_BIG_TO_CACHE); + goto out_unlock; + } /* * Check for pending referring calls. If a match is found, a diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c index 976c90608e56..d81f96aacd51 100644 --- a/fs/nfs/callback_xdr.c +++ b/fs/nfs/callback_xdr.c @@ -146,10 +146,16 @@ static __be32 decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) p = read_buf(xdr, NFS4_STATEID_SIZE); if (unlikely(p == NULL)) return htonl(NFS4ERR_RESOURCE); - memcpy(stateid, p, NFS4_STATEID_SIZE); + memcpy(stateid->data, p, NFS4_STATEID_SIZE); return 0; } +static __be32 decode_delegation_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_DELEGATION_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} + static __be32 decode_compound_hdr_arg(struct xdr_stream *xdr, struct cb_compound_hdr_arg *hdr) { __be32 *p; @@ -211,7 +217,7 @@ static __be32 decode_recall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr, __be32 *p; __be32 status; - status = decode_stateid(xdr, &args->stateid); + status = decode_delegation_stateid(xdr, &args->stateid); if (unlikely(status != 0)) goto out; p = read_buf(xdr, 4); @@ -227,6 +233,11 @@ out: } #if defined(CONFIG_NFS_V4_1) +static __be32 decode_layout_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_LAYOUT_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp, struct xdr_stream *xdr, @@ -263,7 +274,7 @@ static __be32 decode_layoutrecall_args(struct svc_rqst *rqstp, } p = xdr_decode_hyper(p, &args->cbl_range.offset); p = xdr_decode_hyper(p, &args->cbl_range.length); - status = decode_stateid(xdr, &args->cbl_stateid); + status = decode_layout_stateid(xdr, &args->cbl_stateid); if (unlikely(status != 0)) goto out; } else if (args->cbl_recall_type == RETURN_FSID) { diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c index 5166adcfc0fb..322c2585bc34 100644 --- a/fs/nfs/delegation.c +++ b/fs/nfs/delegation.c @@ -875,15 +875,16 @@ int nfs_delegations_present(struct nfs_client *clp) /** * nfs4_copy_delegation_stateid - Copy inode's state ID information - * @dst: stateid data structure to fill in * @inode: inode to check * @flags: delegation type requirement + * @dst: stateid data structure to fill in + * @cred: optional argument to retrieve credential * * Returns "true" and fills in "dst->data" * if inode had a delegation, * otherwise "false" is returned. */ -bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode, - fmode_t flags) +bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags, + nfs4_stateid *dst, struct rpc_cred **cred) { struct nfs_inode *nfsi = NFS_I(inode); struct nfs_delegation *delegation; @@ -896,6 +897,8 @@ bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode, if (ret) { nfs4_stateid_copy(dst, &delegation->stateid); nfs_mark_delegation_referenced(delegation); + if (cred) + *cred = get_rpccred(delegation->cred); } rcu_read_unlock(); return ret; diff --git a/fs/nfs/delegation.h b/fs/nfs/delegation.h index 333063e032f0..64724d252a79 100644 --- a/fs/nfs/delegation.h +++ b/fs/nfs/delegation.h @@ -56,7 +56,7 @@ void nfs_delegation_reap_unclaimed(struct nfs_client *clp); int nfs4_proc_delegreturn(struct inode *inode, struct rpc_cred *cred, const nfs4_stateid *stateid, int issync); int nfs4_open_delegation_recall(struct nfs_open_context *ctx, struct nfs4_state *state, const nfs4_stateid *stateid, fmode_t type); int nfs4_lock_delegation_recall(struct file_lock *fl, struct nfs4_state *state, const nfs4_stateid *stateid); -bool nfs4_copy_delegation_stateid(nfs4_stateid *dst, struct inode *inode, fmode_t flags); +bool nfs4_copy_delegation_stateid(struct inode *inode, fmode_t flags, nfs4_stateid *dst, struct rpc_cred **cred); void nfs_mark_delegation_referenced(struct nfs_delegation *delegation); int nfs4_have_delegation(struct inode *inode, fmode_t flags); diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c index 741a92c470bb..979b3c4dee6a 100644 --- a/fs/nfs/direct.c +++ b/fs/nfs/direct.c @@ -87,6 +87,7 @@ struct nfs_direct_req { int mirror_count; ssize_t count, /* bytes actually processed */ + max_count, /* max expected count */ bytes_left, /* bytes left to be sent */ io_start, /* start of IO */ error; /* any reported error */ @@ -123,6 +124,8 @@ nfs_direct_good_bytes(struct nfs_direct_req *dreq, struct nfs_pgio_header *hdr) int i; ssize_t count; + WARN_ON_ONCE(dreq->count >= dreq->max_count); + if (dreq->mirror_count == 1) { dreq->mirrors[hdr->pgio_mirror_idx].count += hdr->good_bytes; dreq->count += hdr->good_bytes; @@ -275,7 +278,7 @@ static void nfs_direct_release_pages(struct page **pages, unsigned int npages) void nfs_init_cinfo_from_dreq(struct nfs_commit_info *cinfo, struct nfs_direct_req *dreq) { - cinfo->lock = &dreq->inode->i_lock; + cinfo->inode = dreq->inode; cinfo->mds = &dreq->mds_cinfo; cinfo->ds = &dreq->ds_cinfo; cinfo->dreq = dreq; @@ -591,7 +594,7 @@ ssize_t nfs_file_direct_read(struct kiocb *iocb, struct iov_iter *iter) goto out_unlock; dreq->inode = inode; - dreq->bytes_left = count; + dreq->bytes_left = dreq->max_count = count; dreq->io_start = iocb->ki_pos; dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); l_ctx = nfs_get_lock_context(dreq->ctx); @@ -630,13 +633,13 @@ nfs_direct_write_scan_commit_list(struct inode *inode, struct list_head *list, struct nfs_commit_info *cinfo) { - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); #ifdef CONFIG_NFS_V4_1 if (cinfo->ds != NULL && cinfo->ds->nwritten != 0) NFS_SERVER(inode)->pnfs_curr_ld->recover_commit_reqs(list, cinfo); #endif nfs_scan_commit_list(&cinfo->mds->list, list, cinfo, 0); - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); } static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) @@ -671,13 +674,13 @@ static void nfs_direct_write_reschedule(struct nfs_direct_req *dreq) if (!nfs_pageio_add_request(&desc, req)) { nfs_list_remove_request(req); nfs_list_add_request(req, &failed); - spin_lock(cinfo.lock); + spin_lock(&cinfo.inode->i_lock); dreq->flags = 0; if (desc.pg_error < 0) dreq->error = desc.pg_error; else dreq->error = -EIO; - spin_unlock(cinfo.lock); + spin_unlock(&cinfo.inode->i_lock); } nfs_release_request(req); } @@ -1023,7 +1026,7 @@ ssize_t nfs_file_direct_write(struct kiocb *iocb, struct iov_iter *iter) goto out_unlock; dreq->inode = inode; - dreq->bytes_left = iov_iter_count(iter); + dreq->bytes_left = dreq->max_count = iov_iter_count(iter); dreq->io_start = pos; dreq->ctx = get_nfs_open_context(nfs_file_open_context(iocb->ki_filp)); l_ctx = nfs_get_lock_context(dreq->ctx); diff --git a/fs/nfs/filelayout/filelayout.c b/fs/nfs/filelayout/filelayout.c index 3384dc8e6683..aa59757389dc 100644 --- a/fs/nfs/filelayout/filelayout.c +++ b/fs/nfs/filelayout/filelayout.c @@ -795,7 +795,7 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg, buckets[i].direct_verf.committed = NFS_INVALID_STABLE_HOW; } - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); if (cinfo->ds->nbuckets >= size) goto out; for (i = 0; i < cinfo->ds->nbuckets; i++) { @@ -811,7 +811,7 @@ filelayout_alloc_commit_info(struct pnfs_layout_segment *lseg, swap(cinfo->ds->buckets, buckets); cinfo->ds->nbuckets = size; out: - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); kfree(buckets); return 0; } @@ -890,6 +890,7 @@ filelayout_pg_init_read(struct nfs_pageio_descriptor *pgio, 0, NFS4_MAX_UINT64, IOMODE_READ, + false, GFP_KERNEL); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); @@ -915,6 +916,7 @@ filelayout_pg_init_write(struct nfs_pageio_descriptor *pgio, 0, NFS4_MAX_UINT64, IOMODE_RW, + false, GFP_NOFS); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c index 0cb1abd535e3..0e8018bc9880 100644 --- a/fs/nfs/flexfilelayout/flexfilelayout.c +++ b/fs/nfs/flexfilelayout/flexfilelayout.c @@ -26,6 +26,8 @@ #define FF_LAYOUT_POLL_RETRY_MAX (15*HZ) +static struct group_info *ff_zero_group; + static struct pnfs_layout_hdr * ff_layout_alloc_layout_hdr(struct inode *inode, gfp_t gfp_flags) { @@ -53,14 +55,15 @@ ff_layout_free_layout_hdr(struct pnfs_layout_hdr *lo) kfree(FF_LAYOUT_FROM_HDR(lo)); } -static int decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +static int decode_pnfs_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) { __be32 *p; p = xdr_inline_decode(xdr, NFS4_STATEID_SIZE); if (unlikely(p == NULL)) return -ENOBUFS; - memcpy(stateid, p, NFS4_STATEID_SIZE); + stateid->type = NFS4_PNFS_DS_STATEID_TYPE; + memcpy(stateid->data, p, NFS4_STATEID_SIZE); dprintk("%s: stateid id= [%x%x%x%x]\n", __func__, p[0], p[1], p[2], p[3]); return 0; @@ -211,10 +214,16 @@ static struct nfs4_ff_layout_mirror *ff_layout_alloc_mirror(gfp_t gfp_flags) static void ff_layout_free_mirror(struct nfs4_ff_layout_mirror *mirror) { + struct rpc_cred *cred; + ff_layout_remove_mirror(mirror); kfree(mirror->fh_versions); - if (mirror->cred) - put_rpccred(mirror->cred); + cred = rcu_access_pointer(mirror->ro_cred); + if (cred) + put_rpccred(cred); + cred = rcu_access_pointer(mirror->rw_cred); + if (cred) + put_rpccred(cred); nfs4_ff_layout_put_deviceid(mirror->mirror_ds); kfree(mirror); } @@ -290,6 +299,8 @@ ff_lseg_merge(struct pnfs_layout_segment *new, { u64 new_end, old_end; + if (test_bit(NFS_LSEG_LAYOUTRETURN, &old->pls_flags)) + return false; if (new->pls_range.iomode != old->pls_range.iomode) return false; old_end = pnfs_calc_offset_end(old->pls_range.offset, @@ -310,8 +321,6 @@ ff_lseg_merge(struct pnfs_layout_segment *new, new_end); if (test_bit(NFS_LSEG_ROC, &old->pls_flags)) set_bit(NFS_LSEG_ROC, &new->pls_flags); - if (test_bit(NFS_LSEG_LAYOUTRETURN, &old->pls_flags)) - set_bit(NFS_LSEG_LAYOUTRETURN, &new->pls_flags); return true; } @@ -407,8 +416,9 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, struct nfs4_ff_layout_mirror *mirror; struct nfs4_deviceid devid; struct nfs4_deviceid_node *idnode; - u32 ds_count; - u32 fh_count; + struct auth_cred acred = { .group_info = ff_zero_group }; + struct rpc_cred __rcu *cred; + u32 ds_count, fh_count, id; int j; rc = -EIO; @@ -456,7 +466,7 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, fls->mirror_array[i]->efficiency = be32_to_cpup(p); /* stateid */ - rc = decode_stateid(&stream, &fls->mirror_array[i]->stateid); + rc = decode_pnfs_stateid(&stream, &fls->mirror_array[i]->stateid); if (rc) goto out_err_free; @@ -484,24 +494,49 @@ ff_layout_alloc_lseg(struct pnfs_layout_hdr *lh, fls->mirror_array[i]->fh_versions_cnt = fh_count; /* user */ - rc = decode_name(&stream, &fls->mirror_array[i]->uid); + rc = decode_name(&stream, &id); if (rc) goto out_err_free; + acred.uid = make_kuid(&init_user_ns, id); + /* group */ - rc = decode_name(&stream, &fls->mirror_array[i]->gid); + rc = decode_name(&stream, &id); if (rc) goto out_err_free; + acred.gid = make_kgid(&init_user_ns, id); + + /* find the cred for it */ + rcu_assign_pointer(cred, rpc_lookup_generic_cred(&acred, 0, gfp_flags)); + if (IS_ERR(cred)) { + rc = PTR_ERR(cred); + goto out_err_free; + } + + if (lgr->range.iomode == IOMODE_READ) + rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred); + else + rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred); + mirror = ff_layout_add_mirror(lh, fls->mirror_array[i]); if (mirror != fls->mirror_array[i]) { + /* swap cred ptrs so free_mirror will clean up old */ + if (lgr->range.iomode == IOMODE_READ) { + cred = xchg(&mirror->ro_cred, cred); + rcu_assign_pointer(fls->mirror_array[i]->ro_cred, cred); + } else { + cred = xchg(&mirror->rw_cred, cred); + rcu_assign_pointer(fls->mirror_array[i]->rw_cred, cred); + } ff_layout_free_mirror(fls->mirror_array[i]); fls->mirror_array[i] = mirror; } - dprintk("%s: uid %d gid %d\n", __func__, - fls->mirror_array[i]->uid, - fls->mirror_array[i]->gid); + dprintk("%s: iomode %s uid %u gid %u\n", __func__, + lgr->range.iomode == IOMODE_READ ? "READ" : "RW", + from_kuid(&init_user_ns, acred.uid), + from_kgid(&init_user_ns, acred.gid)); } p = xdr_inline_decode(&stream, 4); @@ -745,7 +780,7 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg, else { int i; - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); if (cinfo->ds->nbuckets != 0) kfree(buckets); else { @@ -759,7 +794,7 @@ ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg, NFS_INVALID_STABLE_HOW; } } - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); return 0; } } @@ -786,6 +821,36 @@ ff_layout_choose_best_ds_for_read(struct pnfs_layout_segment *lseg, } static void +ff_layout_pg_get_read(struct nfs_pageio_descriptor *pgio, + struct nfs_page *req, + bool strict_iomode) +{ +retry_strict: + pnfs_put_lseg(pgio->pg_lseg); + pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode, + req->wb_context, + 0, + NFS4_MAX_UINT64, + IOMODE_READ, + strict_iomode, + GFP_KERNEL); + if (IS_ERR(pgio->pg_lseg)) { + pgio->pg_error = PTR_ERR(pgio->pg_lseg); + pgio->pg_lseg = NULL; + } + + /* If we don't have checking, do get a IOMODE_RW + * segment, and the server wants to avoid READs + * there, then retry! + */ + if (pgio->pg_lseg && !strict_iomode && + ff_layout_avoid_read_on_rw(pgio->pg_lseg)) { + strict_iomode = true; + goto retry_strict; + } +} + +static void ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *req) { @@ -795,26 +860,23 @@ ff_layout_pg_init_read(struct nfs_pageio_descriptor *pgio, int ds_idx; /* Use full layout for now */ - if (!pgio->pg_lseg) { - pgio->pg_lseg = pnfs_update_layout(pgio->pg_inode, - req->wb_context, - 0, - NFS4_MAX_UINT64, - IOMODE_READ, - GFP_KERNEL); - if (IS_ERR(pgio->pg_lseg)) { - pgio->pg_error = PTR_ERR(pgio->pg_lseg); - pgio->pg_lseg = NULL; - return; - } - } + if (!pgio->pg_lseg) + ff_layout_pg_get_read(pgio, req, false); + else if (ff_layout_avoid_read_on_rw(pgio->pg_lseg)) + ff_layout_pg_get_read(pgio, req, true); + /* If no lseg, fall back to read through mds */ if (pgio->pg_lseg == NULL) goto out_mds; ds = ff_layout_choose_best_ds_for_read(pgio->pg_lseg, 0, &ds_idx); - if (!ds) - goto out_mds; + if (!ds) { + if (ff_layout_no_fallback_to_mds(pgio->pg_lseg)) + goto out_pnfs; + else + goto out_mds; + } + mirror = FF_LAYOUT_COMP(pgio->pg_lseg, ds_idx); pgio->pg_mirror_idx = ds_idx; @@ -828,6 +890,12 @@ out_mds: pnfs_put_lseg(pgio->pg_lseg); pgio->pg_lseg = NULL; nfs_pageio_reset_read_mds(pgio); + return; + +out_pnfs: + pnfs_set_lo_fail(pgio->pg_lseg); + pnfs_put_lseg(pgio->pg_lseg); + pgio->pg_lseg = NULL; } static void @@ -847,6 +915,7 @@ ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio, 0, NFS4_MAX_UINT64, IOMODE_RW, + false, GFP_NOFS); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); @@ -870,8 +939,12 @@ ff_layout_pg_init_write(struct nfs_pageio_descriptor *pgio, for (i = 0; i < pgio->pg_mirror_count; i++) { ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, i, true); - if (!ds) - goto out_mds; + if (!ds) { + if (ff_layout_no_fallback_to_mds(pgio->pg_lseg)) + goto out_pnfs; + else + goto out_mds; + } pgm = &pgio->pg_mirrors[i]; mirror = FF_LAYOUT_COMP(pgio->pg_lseg, i); pgm->pg_bsize = mirror->mirror_ds->ds_versions[0].wsize; @@ -883,6 +956,12 @@ out_mds: pnfs_put_lseg(pgio->pg_lseg); pgio->pg_lseg = NULL; nfs_pageio_reset_write_mds(pgio); + return; + +out_pnfs: + pnfs_set_lo_fail(pgio->pg_lseg); + pnfs_put_lseg(pgio->pg_lseg); + pgio->pg_lseg = NULL; } static unsigned int @@ -895,6 +974,7 @@ ff_layout_pg_get_mirror_count_write(struct nfs_pageio_descriptor *pgio, 0, NFS4_MAX_UINT64, IOMODE_RW, + false, GFP_NOFS); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); @@ -1067,8 +1147,7 @@ static int ff_layout_async_handle_error_v4(struct rpc_task *task, rpc_wake_up(&tbl->slot_tbl_waitq); /* fall through */ default: - if (ff_layout_no_fallback_to_mds(lseg) || - ff_layout_has_available_ds(lseg)) + if (ff_layout_avoid_mds_available_ds(lseg)) return -NFS4ERR_RESET_TO_PNFS; reset: dprintk("%s Retry through MDS. Error %d\n", __func__, @@ -1215,8 +1294,6 @@ static int ff_layout_read_done_cb(struct rpc_task *task, hdr->pgio_mirror_idx + 1, &hdr->pgio_mirror_idx)) goto out_eagain; - set_bit(NFS_LAYOUT_RETURN_REQUESTED, - &hdr->lseg->pls_layout->plh_flags); pnfs_read_resend_pnfs(hdr); return task->tk_status; case -NFS4ERR_RESET_TO_MDS: @@ -1260,7 +1337,7 @@ ff_layout_set_layoutcommit(struct nfs_pgio_header *hdr) } static bool -ff_layout_reset_to_mds(struct pnfs_layout_segment *lseg, int idx) +ff_layout_device_unavailable(struct pnfs_layout_segment *lseg, int idx) { /* No mirroring for now */ struct nfs4_deviceid_node *node = FF_LAYOUT_DEVID_NODE(lseg, idx); @@ -1297,16 +1374,10 @@ static int ff_layout_read_prepare_common(struct rpc_task *task, rpc_exit(task, -EIO); return -EIO; } - if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) { - dprintk("%s task %u reset io to MDS\n", __func__, task->tk_pid); - if (ff_layout_has_available_ds(hdr->lseg)) - pnfs_read_resend_pnfs(hdr); - else - ff_layout_reset_read(hdr); - rpc_exit(task, 0); + if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) { + rpc_exit(task, -EHOSTDOWN); return -EAGAIN; } - hdr->pgio_done_cb = ff_layout_read_done_cb; ff_layout_read_record_layoutstats_start(task, hdr); return 0; @@ -1496,14 +1567,8 @@ static int ff_layout_write_prepare_common(struct rpc_task *task, return -EIO; } - if (ff_layout_reset_to_mds(hdr->lseg, hdr->pgio_mirror_idx)) { - bool retry_pnfs; - - retry_pnfs = ff_layout_has_available_ds(hdr->lseg); - dprintk("%s task %u reset io to %s\n", __func__, - task->tk_pid, retry_pnfs ? "pNFS" : "MDS"); - ff_layout_reset_write(hdr, retry_pnfs); - rpc_exit(task, 0); + if (ff_layout_device_unavailable(hdr->lseg, hdr->pgio_mirror_idx)) { + rpc_exit(task, -EHOSTDOWN); return -EAGAIN; } @@ -1712,7 +1777,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr) goto out_failed; ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred); - if (IS_ERR(ds_cred)) + if (!ds_cred) goto out_failed; vers = nfs4_ff_layout_ds_version(lseg, idx); @@ -1720,6 +1785,7 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr) dprintk("%s USE DS: %s cl_count %d vers %d\n", __func__, ds->ds_remotestr, atomic_read(&ds->ds_clp->cl_count), vers); + hdr->pgio_done_cb = ff_layout_read_done_cb; atomic_inc(&ds->ds_clp->cl_count); hdr->ds_clp = ds->ds_clp; fh = nfs4_ff_layout_select_ds_fh(lseg, idx); @@ -1737,11 +1803,11 @@ ff_layout_read_pagelist(struct nfs_pgio_header *hdr) vers == 3 ? &ff_layout_read_call_ops_v3 : &ff_layout_read_call_ops_v4, 0, RPC_TASK_SOFTCONN); - + put_rpccred(ds_cred); return PNFS_ATTEMPTED; out_failed: - if (ff_layout_has_available_ds(lseg)) + if (ff_layout_avoid_mds_available_ds(lseg)) return PNFS_TRY_AGAIN; return PNFS_NOT_ATTEMPTED; } @@ -1769,7 +1835,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync) return PNFS_NOT_ATTEMPTED; ds_cred = ff_layout_get_ds_cred(lseg, idx, hdr->cred); - if (IS_ERR(ds_cred)) + if (!ds_cred) return PNFS_NOT_ATTEMPTED; vers = nfs4_ff_layout_ds_version(lseg, idx); @@ -1798,6 +1864,7 @@ ff_layout_write_pagelist(struct nfs_pgio_header *hdr, int sync) vers == 3 ? &ff_layout_write_call_ops_v3 : &ff_layout_write_call_ops_v4, sync, RPC_TASK_SOFTCONN); + put_rpccred(ds_cred); return PNFS_ATTEMPTED; } @@ -1824,7 +1891,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how) struct rpc_clnt *ds_clnt; struct rpc_cred *ds_cred; u32 idx; - int vers; + int vers, ret; struct nfs_fh *fh; idx = calc_ds_index_from_commit(lseg, data->ds_commit_index); @@ -1838,7 +1905,7 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how) goto out_err; ds_cred = ff_layout_get_ds_cred(lseg, idx, data->cred); - if (IS_ERR(ds_cred)) + if (!ds_cred) goto out_err; vers = nfs4_ff_layout_ds_version(lseg, idx); @@ -1854,10 +1921,12 @@ static int ff_layout_initiate_commit(struct nfs_commit_data *data, int how) if (fh) data->args.fh = fh; - return nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops, + ret = nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops, vers == 3 ? &ff_layout_commit_call_ops_v3 : &ff_layout_commit_call_ops_v4, how, RPC_TASK_SOFTCONN); + put_rpccred(ds_cred); + return ret; out_err: pnfs_generic_prepare_to_resend_writes(data); pnfs_generic_commit_release(data); @@ -2223,6 +2292,11 @@ static int __init nfs4flexfilelayout_init(void) { printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Registering...\n", __func__); + if (!ff_zero_group) { + ff_zero_group = groups_alloc(0); + if (!ff_zero_group) + return -ENOMEM; + } return pnfs_register_layoutdriver(&flexfilelayout_type); } @@ -2231,6 +2305,10 @@ static void __exit nfs4flexfilelayout_exit(void) printk(KERN_INFO "%s: NFSv4 Flexfile Layout Driver Unregistering...\n", __func__); pnfs_unregister_layoutdriver(&flexfilelayout_type); + if (ff_zero_group) { + put_group_info(ff_zero_group); + ff_zero_group = NULL; + } } MODULE_ALIAS("nfs-layouttype4-4"); diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h index dd353bb7dc0a..1bcdb15d0c41 100644 --- a/fs/nfs/flexfilelayout/flexfilelayout.h +++ b/fs/nfs/flexfilelayout/flexfilelayout.h @@ -10,7 +10,8 @@ #define FS_NFS_NFS4FLEXFILELAYOUT_H #define FF_FLAGS_NO_LAYOUTCOMMIT 1 -#define FF_FLAGS_NO_IO_THRU_MDS 2 +#define FF_FLAGS_NO_IO_THRU_MDS 2 +#define FF_FLAGS_NO_READ_IO 4 #include "../pnfs.h" @@ -76,9 +77,8 @@ struct nfs4_ff_layout_mirror { u32 fh_versions_cnt; struct nfs_fh *fh_versions; nfs4_stateid stateid; - u32 uid; - u32 gid; - struct rpc_cred *cred; + struct rpc_cred __rcu *ro_cred; + struct rpc_cred __rcu *rw_cred; atomic_t ref; spinlock_t lock; struct nfs4_ff_layoutstat read_stat; @@ -154,6 +154,12 @@ ff_layout_no_fallback_to_mds(struct pnfs_layout_segment *lseg) } static inline bool +ff_layout_no_read_on_rw(struct pnfs_layout_segment *lseg) +{ + return FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_READ_IO; +} + +static inline bool ff_layout_test_devid_unavailable(struct nfs4_deviceid_node *node) { return nfs4_test_deviceid_unavailable(node); @@ -192,4 +198,7 @@ nfs4_ff_find_or_create_ds_client(struct pnfs_layout_segment *lseg, struct rpc_cred *ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx, struct rpc_cred *mdscred); bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg); +bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg); +bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg); + #endif /* FS_NFS_NFS4FLEXFILELAYOUT_H */ diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c index add0e5a70bd6..0aa36be71fce 100644 --- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c +++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c @@ -228,7 +228,8 @@ ff_ds_error_match(const struct nfs4_ff_layout_ds_err *e1, return e1->opnum < e2->opnum ? -1 : 1; if (e1->status != e2->status) return e1->status < e2->status ? -1 : 1; - ret = memcmp(&e1->stateid, &e2->stateid, sizeof(e1->stateid)); + ret = memcmp(e1->stateid.data, e2->stateid.data, + sizeof(e1->stateid.data)); if (ret != 0) return ret; ret = memcmp(&e1->deviceid, &e2->deviceid, sizeof(e1->deviceid)); @@ -302,40 +303,26 @@ int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo, return 0; } -/* currently we only support AUTH_NONE and AUTH_SYS */ -static rpc_authflavor_t -nfs4_ff_layout_choose_authflavor(struct nfs4_ff_layout_mirror *mirror) +static struct rpc_cred * +ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode) { - if (mirror->uid == (u32)-1) - return RPC_AUTH_NULL; - return RPC_AUTH_UNIX; -} + struct rpc_cred *cred, __rcu **pcred; -/* fetch cred for NFSv3 DS */ -static int ff_layout_update_mirror_cred(struct nfs4_ff_layout_mirror *mirror, - struct nfs4_pnfs_ds *ds) -{ - if (ds->ds_clp && !mirror->cred && - mirror->mirror_ds->ds_versions[0].version == 3) { - struct rpc_auth *auth = ds->ds_clp->cl_rpcclient->cl_auth; - struct rpc_cred *cred; - struct auth_cred acred = { - .uid = make_kuid(&init_user_ns, mirror->uid), - .gid = make_kgid(&init_user_ns, mirror->gid), - }; - - /* AUTH_NULL ignores acred */ - cred = auth->au_ops->lookup_cred(auth, &acred, 0); - if (IS_ERR(cred)) { - dprintk("%s: lookup_cred failed with %ld\n", - __func__, PTR_ERR(cred)); - return PTR_ERR(cred); - } else { - if (cmpxchg(&mirror->cred, NULL, cred)) - put_rpccred(cred); - } - } - return 0; + if (iomode == IOMODE_READ) + pcred = &mirror->ro_cred; + else + pcred = &mirror->rw_cred; + + rcu_read_lock(); + do { + cred = rcu_dereference(*pcred); + if (!cred) + break; + + cred = get_rpccred_rcu(cred); + } while(!cred); + rcu_read_unlock(); + return cred; } struct nfs_fh * @@ -356,7 +343,23 @@ out: return fh; } -/* Upon return, either ds is connected, or ds is NULL */ +/** + * nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call + * @lseg: the layout segment we're operating on + * @ds_idx: index of the DS to use + * @fail_return: return layout on connect failure? + * + * Try to prepare a DS connection to accept an RPC call. This involves + * selecting a mirror to use and connecting the client to it if it's not + * already connected. + * + * Since we only need a single functioning mirror to satisfy a read, we don't + * want to return the layout if there is one. For writes though, any down + * mirror should result in a LAYOUTRETURN. @fail_return is how we distinguish + * between the two cases. + * + * Returns a pointer to a connected DS object on success or NULL on failure. + */ struct nfs4_pnfs_ds * nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, bool fail_return) @@ -367,7 +370,6 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, struct inode *ino = lseg->pls_layout->plh_inode; struct nfs_server *s = NFS_SERVER(ino); unsigned int max_payload; - rpc_authflavor_t flavor; if (!ff_layout_mirror_valid(lseg, mirror)) { pr_err_ratelimited("NFS: %s: No data server for offset index %d\n", @@ -383,9 +385,7 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, /* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */ smp_rmb(); if (ds->ds_clp) - goto out_update_creds; - - flavor = nfs4_ff_layout_choose_authflavor(mirror); + goto out; /* FIXME: For now we assume the server sent only one version of NFS * to use for the DS. @@ -394,7 +394,7 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, dataserver_retrans, mirror->mirror_ds->ds_versions[0].version, mirror->mirror_ds->ds_versions[0].minor_version, - flavor); + RPC_AUTH_UNIX); /* connect success, check rsize/wsize limit */ if (ds->ds_clp) { @@ -410,20 +410,10 @@ nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg, u32 ds_idx, mirror, lseg->pls_range.offset, lseg->pls_range.length, NFS4ERR_NXIO, OP_ILLEGAL, GFP_NOIO); - if (!fail_return) { - if (ff_layout_has_available_ds(lseg)) - set_bit(NFS_LAYOUT_RETURN_REQUESTED, - &lseg->pls_layout->plh_flags); - else - pnfs_error_mark_layout_for_return(ino, lseg); - } else + if (fail_return || !ff_layout_has_available_ds(lseg)) pnfs_error_mark_layout_for_return(ino, lseg); ds = NULL; - goto out; } -out_update_creds: - if (ff_layout_update_mirror_cred(mirror, ds)) - ds = NULL; out: return ds; } @@ -433,16 +423,15 @@ ff_layout_get_ds_cred(struct pnfs_layout_segment *lseg, u32 ds_idx, struct rpc_cred *mdscred) { struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx); - struct rpc_cred *cred = ERR_PTR(-EINVAL); - - if (!nfs4_ff_layout_prepare_ds(lseg, ds_idx, true)) - goto out; + struct rpc_cred *cred; - if (mirror && mirror->cred) - cred = mirror->cred; - else - cred = mdscred; -out: + if (mirror) { + cred = ff_layout_get_mirror_cred(mirror, lseg->pls_range.iomode); + if (!cred) + cred = get_rpccred(mdscred); + } else { + cred = get_rpccred(mdscred); + } return cred; } @@ -562,6 +551,18 @@ bool ff_layout_has_available_ds(struct pnfs_layout_segment *lseg) return ff_rw_layout_has_available_ds(lseg); } +bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg) +{ + return ff_layout_no_fallback_to_mds(lseg) || + ff_layout_has_available_ds(lseg); +} + +bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg) +{ + return lseg->pls_range.iomode == IOMODE_RW && + ff_layout_no_read_on_rw(lseg); +} + module_param(dataserver_retrans, uint, 0644); MODULE_PARM_DESC(dataserver_retrans, "The number of times the NFSv4.1 client " "retries a request before it attempts further " diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h index f1d1d2c472e9..5154fa65a2f2 100644 --- a/fs/nfs/internal.h +++ b/fs/nfs/internal.h @@ -477,6 +477,7 @@ void nfs_mark_request_commit(struct nfs_page *req, u32 ds_commit_idx); int nfs_write_need_commit(struct nfs_pgio_header *); void nfs_writeback_update_inode(struct nfs_pgio_header *hdr); +int nfs_commit_file(struct file *file, struct nfs_write_verifier *verf); int nfs_generic_commit_list(struct inode *inode, struct list_head *head, int how, struct nfs_commit_info *cinfo); void nfs_retry_commit(struct list_head *page_list, diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h index b587ccd31083..b6cd15314bab 100644 --- a/fs/nfs/nfs42.h +++ b/fs/nfs/nfs42.h @@ -13,6 +13,7 @@ /* nfs4.2proc.c */ int nfs42_proc_allocate(struct file *, loff_t, loff_t); +ssize_t nfs42_proc_copy(struct file *, loff_t, struct file *, loff_t, size_t); int nfs42_proc_deallocate(struct file *, loff_t, loff_t); loff_t nfs42_proc_llseek(struct file *, loff_t, int); int nfs42_proc_layoutstats_generic(struct nfs_server *, diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c index dff83460e5a6..aa03ed09ba06 100644 --- a/fs/nfs/nfs42proc.c +++ b/fs/nfs/nfs42proc.c @@ -126,6 +126,111 @@ int nfs42_proc_deallocate(struct file *filep, loff_t offset, loff_t len) return err; } +static ssize_t _nfs42_proc_copy(struct file *src, loff_t pos_src, + struct nfs_lock_context *src_lock, + struct file *dst, loff_t pos_dst, + struct nfs_lock_context *dst_lock, + size_t count) +{ + struct nfs42_copy_args args = { + .src_fh = NFS_FH(file_inode(src)), + .src_pos = pos_src, + .dst_fh = NFS_FH(file_inode(dst)), + .dst_pos = pos_dst, + .count = count, + }; + struct nfs42_copy_res res; + struct rpc_message msg = { + .rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_COPY], + .rpc_argp = &args, + .rpc_resp = &res, + }; + struct inode *dst_inode = file_inode(dst); + struct nfs_server *server = NFS_SERVER(dst_inode); + int status; + + status = nfs4_set_rw_stateid(&args.src_stateid, src_lock->open_context, + src_lock, FMODE_READ); + if (status) + return status; + + status = nfs4_set_rw_stateid(&args.dst_stateid, dst_lock->open_context, + dst_lock, FMODE_WRITE); + if (status) + return status; + + status = nfs4_call_sync(server->client, server, &msg, + &args.seq_args, &res.seq_res, 0); + if (status == -ENOTSUPP) + server->caps &= ~NFS_CAP_COPY; + if (status) + return status; + + if (res.write_res.verifier.committed != NFS_FILE_SYNC) { + status = nfs_commit_file(dst, &res.write_res.verifier.verifier); + if (status) + return status; + } + + truncate_pagecache_range(dst_inode, pos_dst, + pos_dst + res.write_res.count); + + return res.write_res.count; +} + +ssize_t nfs42_proc_copy(struct file *src, loff_t pos_src, + struct file *dst, loff_t pos_dst, + size_t count) +{ + struct nfs_server *server = NFS_SERVER(file_inode(dst)); + struct nfs_lock_context *src_lock; + struct nfs_lock_context *dst_lock; + struct nfs4_exception src_exception = { }; + struct nfs4_exception dst_exception = { }; + ssize_t err, err2; + + if (!nfs_server_capable(file_inode(dst), NFS_CAP_COPY)) + return -EOPNOTSUPP; + + src_lock = nfs_get_lock_context(nfs_file_open_context(src)); + if (IS_ERR(src_lock)) + return PTR_ERR(src_lock); + + src_exception.inode = file_inode(src); + src_exception.state = src_lock->open_context->state; + + dst_lock = nfs_get_lock_context(nfs_file_open_context(dst)); + if (IS_ERR(dst_lock)) { + err = PTR_ERR(dst_lock); + goto out_put_src_lock; + } + + dst_exception.inode = file_inode(dst); + dst_exception.state = dst_lock->open_context->state; + + do { + inode_lock(file_inode(dst)); + err = _nfs42_proc_copy(src, pos_src, src_lock, + dst, pos_dst, dst_lock, count); + inode_unlock(file_inode(dst)); + + if (err == -ENOTSUPP) { + err = -EOPNOTSUPP; + break; + } + + err2 = nfs4_handle_exception(server, err, &src_exception); + err = nfs4_handle_exception(server, err, &dst_exception); + if (!err) + err = err2; + } while (src_exception.retry || dst_exception.retry); + + nfs_put_lock_context(dst_lock); +out_put_src_lock: + nfs_put_lock_context(src_lock); + return err; +} + static loff_t _nfs42_proc_llseek(struct file *filep, struct nfs_lock_context *lock, loff_t offset, int whence) { @@ -232,7 +337,7 @@ nfs42_layoutstat_done(struct rpc_task *task, void *calldata) * with the current stateid. */ set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags); - pnfs_mark_matching_lsegs_invalid(lo, &head, NULL); + pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0); spin_unlock(&inode->i_lock); pnfs_free_lseg_list(&head); } else diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c index 0ca482a51e53..6dc6f2aea0d6 100644 --- a/fs/nfs/nfs42xdr.c +++ b/fs/nfs/nfs42xdr.c @@ -9,9 +9,22 @@ #define encode_fallocate_maxsz (encode_stateid_maxsz + \ 2 /* offset */ + \ 2 /* length */) +#define NFS42_WRITE_RES_SIZE (1 /* wr_callback_id size */ +\ + XDR_QUADLEN(NFS4_STATEID_SIZE) + \ + 2 /* wr_count */ + \ + 1 /* wr_committed */ + \ + XDR_QUADLEN(NFS4_VERIFIER_SIZE)) #define encode_allocate_maxsz (op_encode_hdr_maxsz + \ encode_fallocate_maxsz) #define decode_allocate_maxsz (op_decode_hdr_maxsz) +#define encode_copy_maxsz (op_encode_hdr_maxsz + \ + XDR_QUADLEN(NFS4_STATEID_SIZE) + \ + XDR_QUADLEN(NFS4_STATEID_SIZE) + \ + 2 + 2 + 2 + 1 + 1 + 1) +#define decode_copy_maxsz (op_decode_hdr_maxsz + \ + NFS42_WRITE_RES_SIZE + \ + 1 /* cr_consecutive */ + \ + 1 /* cr_synchronous */) #define encode_deallocate_maxsz (op_encode_hdr_maxsz + \ encode_fallocate_maxsz) #define decode_deallocate_maxsz (op_decode_hdr_maxsz) @@ -49,6 +62,16 @@ decode_putfh_maxsz + \ decode_allocate_maxsz + \ decode_getattr_maxsz) +#define NFS4_enc_copy_sz (compound_encode_hdr_maxsz + \ + encode_putfh_maxsz + \ + encode_savefh_maxsz + \ + encode_putfh_maxsz + \ + encode_copy_maxsz) +#define NFS4_dec_copy_sz (compound_decode_hdr_maxsz + \ + decode_putfh_maxsz + \ + decode_savefh_maxsz + \ + decode_putfh_maxsz + \ + decode_copy_maxsz) #define NFS4_enc_deallocate_sz (compound_encode_hdr_maxsz + \ encode_putfh_maxsz + \ encode_deallocate_maxsz + \ @@ -102,6 +125,23 @@ static void encode_allocate(struct xdr_stream *xdr, encode_fallocate(xdr, args); } +static void encode_copy(struct xdr_stream *xdr, + struct nfs42_copy_args *args, + struct compound_hdr *hdr) +{ + encode_op_hdr(xdr, OP_COPY, decode_copy_maxsz, hdr); + encode_nfs4_stateid(xdr, &args->src_stateid); + encode_nfs4_stateid(xdr, &args->dst_stateid); + + encode_uint64(xdr, args->src_pos); + encode_uint64(xdr, args->dst_pos); + encode_uint64(xdr, args->count); + + encode_uint32(xdr, 1); /* consecutive = true */ + encode_uint32(xdr, 1); /* synchronous = true */ + encode_uint32(xdr, 0); /* src server list */ +} + static void encode_deallocate(struct xdr_stream *xdr, struct nfs42_falloc_args *args, struct compound_hdr *hdr) @@ -182,6 +222,26 @@ static void nfs4_xdr_enc_allocate(struct rpc_rqst *req, } /* + * Encode COPY request + */ +static void nfs4_xdr_enc_copy(struct rpc_rqst *req, + struct xdr_stream *xdr, + struct nfs42_copy_args *args) +{ + struct compound_hdr hdr = { + .minorversion = nfs4_xdr_minorversion(&args->seq_args), + }; + + encode_compound_hdr(xdr, req, &hdr); + encode_sequence(xdr, &args->seq_args, &hdr); + encode_putfh(xdr, args->src_fh, &hdr); + encode_savefh(xdr, &hdr); + encode_putfh(xdr, args->dst_fh, &hdr); + encode_copy(xdr, args, &hdr); + encode_nops(&hdr); +} + +/* * Encode DEALLOCATE request */ static void nfs4_xdr_enc_deallocate(struct rpc_rqst *req, @@ -266,6 +326,62 @@ static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res) return decode_op_hdr(xdr, OP_ALLOCATE); } +static int decode_write_response(struct xdr_stream *xdr, + struct nfs42_write_res *res) +{ + __be32 *p; + int stateids; + + p = xdr_inline_decode(xdr, 4 + 8 + 4); + if (unlikely(!p)) + goto out_overflow; + + stateids = be32_to_cpup(p++); + p = xdr_decode_hyper(p, &res->count); + res->verifier.committed = be32_to_cpup(p); + return decode_verifier(xdr, &res->verifier.verifier); + +out_overflow: + print_overflow_msg(__func__, xdr); + return -EIO; +} + +static int decode_copy_requirements(struct xdr_stream *xdr, + struct nfs42_copy_res *res) { + __be32 *p; + + p = xdr_inline_decode(xdr, 4 + 4); + if (unlikely(!p)) + goto out_overflow; + + res->consecutive = be32_to_cpup(p++); + res->synchronous = be32_to_cpup(p++); + return 0; +out_overflow: + print_overflow_msg(__func__, xdr); + return -EIO; +} + +static int decode_copy(struct xdr_stream *xdr, struct nfs42_copy_res *res) +{ + int status; + + status = decode_op_hdr(xdr, OP_COPY); + if (status == NFS4ERR_OFFLOAD_NO_REQS) { + status = decode_copy_requirements(xdr, res); + if (status) + return status; + return NFS4ERR_OFFLOAD_NO_REQS; + } else if (status) + return status; + + status = decode_write_response(xdr, &res->write_res); + if (status) + return status; + + return decode_copy_requirements(xdr, res); +} + static int decode_deallocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res) { return decode_op_hdr(xdr, OP_DEALLOCATE); @@ -331,6 +447,36 @@ out: } /* + * Decode COPY response + */ +static int nfs4_xdr_dec_copy(struct rpc_rqst *rqstp, + struct xdr_stream *xdr, + struct nfs42_copy_res *res) +{ + struct compound_hdr hdr; + int status; + + status = decode_compound_hdr(xdr, &hdr); + if (status) + goto out; + status = decode_sequence(xdr, &res->seq_res, rqstp); + if (status) + goto out; + status = decode_putfh(xdr); + if (status) + goto out; + status = decode_savefh(xdr); + if (status) + goto out; + status = decode_putfh(xdr); + if (status) + goto out; + status = decode_copy(xdr, res); +out: + return status; +} + +/* * Decode DEALLOCATE request */ static int nfs4_xdr_dec_deallocate(struct rpc_rqst *rqstp, diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h index 4afdee420d25..768456fa1b17 100644 --- a/fs/nfs/nfs4_fs.h +++ b/fs/nfs/nfs4_fs.h @@ -438,8 +438,9 @@ extern void nfs41_handle_server_scope(struct nfs_client *, struct nfs41_server_scope **); extern void nfs4_put_lock_state(struct nfs4_lock_state *lsp); extern int nfs4_set_lock_state(struct nfs4_state *state, struct file_lock *fl); -extern int nfs4_select_rw_stateid(nfs4_stateid *, struct nfs4_state *, - fmode_t, const struct nfs_lockowner *); +extern int nfs4_select_rw_stateid(struct nfs4_state *, fmode_t, + const struct nfs_lockowner *, nfs4_stateid *, + struct rpc_cred **); extern struct nfs_seqid *nfs_alloc_seqid(struct nfs_seqid_counter *counter, gfp_t gfp_mask); extern int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task); @@ -496,12 +497,15 @@ extern struct svc_version nfs4_callback_version4; static inline void nfs4_stateid_copy(nfs4_stateid *dst, const nfs4_stateid *src) { - memcpy(dst, src, sizeof(*dst)); + memcpy(dst->data, src->data, sizeof(dst->data)); + dst->type = src->type; } static inline bool nfs4_stateid_match(const nfs4_stateid *dst, const nfs4_stateid *src) { - return memcmp(dst, src, sizeof(*dst)) == 0; + if (dst->type != src->type) + return false; + return memcmp(dst->data, src->data, sizeof(dst->data)) == 0; } static inline bool nfs4_stateid_match_other(const nfs4_stateid *dst, const nfs4_stateid *src) diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c index d0390516467c..014b0e41ace5 100644 --- a/fs/nfs/nfs4file.c +++ b/fs/nfs/nfs4file.c @@ -129,6 +129,28 @@ nfs4_file_flush(struct file *file, fl_owner_t id) } #ifdef CONFIG_NFS_V4_2 +static ssize_t nfs4_copy_file_range(struct file *file_in, loff_t pos_in, + struct file *file_out, loff_t pos_out, + size_t count, unsigned int flags) +{ + struct inode *in_inode = file_inode(file_in); + struct inode *out_inode = file_inode(file_out); + int ret; + + if (in_inode == out_inode) + return -EINVAL; + + /* flush any pending writes */ + ret = nfs_sync_inode(in_inode); + if (ret) + return ret; + ret = nfs_sync_inode(out_inode); + if (ret) + return ret; + + return nfs42_proc_copy(file_in, pos_in, file_out, pos_out, count); +} + static loff_t nfs4_file_llseek(struct file *filep, loff_t offset, int whence) { loff_t ret; @@ -243,6 +265,7 @@ const struct file_operations nfs4_file_operations = { .check_flags = nfs_check_flags, .setlease = simple_nosetlease, #ifdef CONFIG_NFS_V4_2 + .copy_file_range = nfs4_copy_file_range, .llseek = nfs4_file_llseek, .fallocate = nfs42_fallocate, .clone_file_range = nfs42_clone_file_range, diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c index 084e8570da18..223982eb38c9 100644 --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@ -74,6 +74,17 @@ #define NFS4_POLL_RETRY_MIN (HZ/10) #define NFS4_POLL_RETRY_MAX (15*HZ) +/* file attributes which can be mapped to nfs attributes */ +#define NFS4_VALID_ATTRS (ATTR_MODE \ + | ATTR_UID \ + | ATTR_GID \ + | ATTR_SIZE \ + | ATTR_ATIME \ + | ATTR_MTIME \ + | ATTR_CTIME \ + | ATTR_ATIME_SET \ + | ATTR_MTIME_SET) + struct nfs4_opendata; static int _nfs4_proc_open(struct nfs4_opendata *data); static int _nfs4_recover_proc_open(struct nfs4_opendata *data); @@ -416,6 +427,7 @@ static int nfs4_do_handle_exception(struct nfs_server *server, case -NFS4ERR_DELAY: nfs_inc_server_stats(server, NFSIOS_DELAY); case -NFS4ERR_GRACE: + case -NFS4ERR_RECALLCONFLICT: exception->delay = 1; return 0; @@ -2558,15 +2570,20 @@ static int _nfs4_do_open(struct inode *dir, if ((opendata->o_arg.open_flags & (O_CREAT|O_EXCL)) == (O_CREAT|O_EXCL) && (opendata->o_arg.createmode != NFS4_CREATE_GUARDED)) { nfs4_exclusive_attrset(opendata, sattr, &label); - - nfs_fattr_init(opendata->o_res.f_attr); - status = nfs4_do_setattr(state->inode, cred, - opendata->o_res.f_attr, sattr, - state, label, olabel); - if (status == 0) { - nfs_setattr_update_inode(state->inode, sattr, - opendata->o_res.f_attr); - nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel); + /* + * send create attributes which was not set by open + * with an extra setattr. + */ + if (sattr->ia_valid & NFS4_VALID_ATTRS) { + nfs_fattr_init(opendata->o_res.f_attr); + status = nfs4_do_setattr(state->inode, cred, + opendata->o_res.f_attr, sattr, + state, label, olabel); + if (status == 0) { + nfs_setattr_update_inode(state->inode, sattr, + opendata->o_res.f_attr); + nfs_setsecurity(state->inode, opendata->o_res.f_attr, olabel); + } } } if (opened && opendata->file_created) @@ -2676,6 +2693,7 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred, .rpc_resp = &res, .rpc_cred = cred, }; + struct rpc_cred *delegation_cred = NULL; unsigned long timestamp = jiffies; fmode_t fmode; bool truncate; @@ -2691,7 +2709,7 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred, truncate = (sattr->ia_valid & ATTR_SIZE) ? true : false; fmode = truncate ? FMODE_WRITE : FMODE_READ; - if (nfs4_copy_delegation_stateid(&arg.stateid, inode, fmode)) { + if (nfs4_copy_delegation_stateid(inode, fmode, &arg.stateid, &delegation_cred)) { /* Use that stateid */ } else if (truncate && state != NULL) { struct nfs_lockowner lockowner = { @@ -2700,13 +2718,17 @@ static int _nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred, }; if (!nfs4_valid_open_stateid(state)) return -EBADF; - if (nfs4_select_rw_stateid(&arg.stateid, state, FMODE_WRITE, - &lockowner) == -EIO) + if (nfs4_select_rw_stateid(state, FMODE_WRITE, &lockowner, + &arg.stateid, &delegation_cred) == -EIO) return -EBADF; } else nfs4_stateid_copy(&arg.stateid, &zero_stateid); + if (delegation_cred) + msg.rpc_cred = delegation_cred; status = nfs4_call_sync(server->client, server, &msg, &arg.seq_args, &res.seq_res, 1); + + put_rpccred(delegation_cred); if (status == 0 && state != NULL) renew_lease(server, timestamp); trace_nfs4_setattr(inode, &arg.stateid, status); @@ -4285,7 +4307,7 @@ int nfs4_set_rw_stateid(nfs4_stateid *stateid, if (l_ctx != NULL) lockowner = &l_ctx->lockowner; - return nfs4_select_rw_stateid(stateid, ctx->state, fmode, lockowner); + return nfs4_select_rw_stateid(ctx->state, fmode, lockowner, stateid, NULL); } EXPORT_SYMBOL_GPL(nfs4_set_rw_stateid); @@ -6054,6 +6076,7 @@ static int nfs41_lock_expired(struct nfs4_state *state, struct file_lock *reques static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock *request) { struct nfs_inode *nfsi = NFS_I(state->inode); + struct nfs4_state_owner *sp = state->owner; unsigned char fl_flags = request->fl_flags; int status = -ENOLCK; @@ -6068,6 +6091,7 @@ static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock status = do_vfs_lock(state->inode, request); if (status < 0) goto out; + mutex_lock(&sp->so_delegreturn_mutex); down_read(&nfsi->rwsem); if (test_bit(NFS_DELEGATED_STATE, &state->flags)) { /* Yes: cache locks! */ @@ -6075,9 +6099,11 @@ static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock request->fl_flags = fl_flags & ~FL_SLEEP; status = do_vfs_lock(state->inode, request); up_read(&nfsi->rwsem); + mutex_unlock(&sp->so_delegreturn_mutex); goto out; } up_read(&nfsi->rwsem); + mutex_unlock(&sp->so_delegreturn_mutex); status = _nfs4_do_setlk(state, cmd, request, NFS_LOCK_NEW); out: request->fl_flags = fl_flags; @@ -7351,9 +7377,11 @@ int nfs4_proc_get_lease_time(struct nfs_client *clp, struct nfs_fsinfo *fsinfo) * always set csa_cachethis to FALSE because the current implementation * of the back channel DRC only supports caching the CB_SEQUENCE operation. */ -static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args) +static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args, + struct rpc_clnt *clnt) { unsigned int max_rqst_sz, max_resp_sz; + unsigned int max_bc_payload = rpc_max_bc_payload(clnt); max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead; max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead; @@ -7371,8 +7399,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args) args->fc_attrs.max_ops, args->fc_attrs.max_reqs); /* Back channel attributes */ - args->bc_attrs.max_rqst_sz = PAGE_SIZE; - args->bc_attrs.max_resp_sz = PAGE_SIZE; + args->bc_attrs.max_rqst_sz = max_bc_payload; + args->bc_attrs.max_resp_sz = max_bc_payload; args->bc_attrs.max_resp_sz_cached = 0; args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS; args->bc_attrs.max_reqs = NFS41_BC_MAX_CALLBACKS; @@ -7476,7 +7504,7 @@ static int _nfs4_proc_create_session(struct nfs_client *clp, }; int status; - nfs4_init_channel_attrs(&args); + nfs4_init_channel_attrs(&args, clp->cl_rpcclient); args.flags = (SESSION4_PERSIST | SESSION4_BACK_CHAN); status = rpc_call_sync(session->clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT); @@ -7820,40 +7848,34 @@ nfs4_layoutget_prepare(struct rpc_task *task, void *calldata) struct nfs4_layoutget *lgp = calldata; struct nfs_server *server = NFS_SERVER(lgp->args.inode); struct nfs4_session *session = nfs4_get_session(server); - int ret; dprintk("--> %s\n", __func__); - /* Note the is a race here, where a CB_LAYOUTRECALL can come in - * right now covering the LAYOUTGET we are about to send. - * However, that is not so catastrophic, and there seems - * to be no way to prevent it completely. - */ - if (nfs41_setup_sequence(session, &lgp->args.seq_args, - &lgp->res.seq_res, task)) - return; - ret = pnfs_choose_layoutget_stateid(&lgp->args.stateid, - NFS_I(lgp->args.inode)->layout, - &lgp->args.range, - lgp->args.ctx->state); - if (ret < 0) - rpc_exit(task, ret); + nfs41_setup_sequence(session, &lgp->args.seq_args, + &lgp->res.seq_res, task); + dprintk("<-- %s\n", __func__); } static void nfs4_layoutget_done(struct rpc_task *task, void *calldata) { struct nfs4_layoutget *lgp = calldata; + + dprintk("--> %s\n", __func__); + nfs41_sequence_done(task, &lgp->res.seq_res); + dprintk("<-- %s\n", __func__); +} + +static int +nfs4_layoutget_handle_exception(struct rpc_task *task, + struct nfs4_layoutget *lgp, struct nfs4_exception *exception) +{ struct inode *inode = lgp->args.inode; struct nfs_server *server = NFS_SERVER(inode); struct pnfs_layout_hdr *lo; - struct nfs4_state *state = NULL; - unsigned long timeo, now, giveup; + int status = task->tk_status; dprintk("--> %s tk_status => %d\n", __func__, -task->tk_status); - if (!nfs41_sequence_done(task, &lgp->res.seq_res)) - goto out; - - switch (task->tk_status) { + switch (status) { case 0: goto out; @@ -7863,57 +7885,43 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata) * retry go inband. */ case -NFS4ERR_LAYOUTUNAVAILABLE: - task->tk_status = -ENODATA; + status = -ENODATA; goto out; /* * NFS4ERR_BADLAYOUT means the MDS cannot return a layout of * length lgp->args.minlength != 0 (see RFC5661 section 18.43.3). */ case -NFS4ERR_BADLAYOUT: - goto out_overflow; + status = -EOVERFLOW; + goto out; /* * NFS4ERR_LAYOUTTRYLATER is a conflict with another client * (or clients) writing to the same RAID stripe except when * the minlength argument is 0 (see RFC5661 section 18.43.3). + * + * Treat it like we would RECALLCONFLICT -- we retry for a little + * while, and then eventually give up. */ case -NFS4ERR_LAYOUTTRYLATER: - if (lgp->args.minlength == 0) - goto out_overflow; - /* - * NFS4ERR_RECALLCONFLICT is when conflict with self (must recall - * existing layout before getting a new one). - */ - case -NFS4ERR_RECALLCONFLICT: - timeo = rpc_get_timeout(task->tk_client); - giveup = lgp->args.timestamp + timeo; - now = jiffies; - if (time_after(giveup, now)) { - unsigned long delay; - - /* Delay for: - * - Not less then NFS4_POLL_RETRY_MIN. - * - One last time a jiffie before we give up - * - exponential backoff (time_now minus start_attempt) - */ - delay = max_t(unsigned long, NFS4_POLL_RETRY_MIN, - min((giveup - now - 1), - now - lgp->args.timestamp)); - - dprintk("%s: NFS4ERR_RECALLCONFLICT waiting %lu\n", - __func__, delay); - rpc_delay(task, delay); - /* Do not call nfs4_async_handle_error() */ - goto out_restart; + if (lgp->args.minlength == 0) { + status = -EOVERFLOW; + goto out; } - break; + /* Fallthrough */ + case -NFS4ERR_RECALLCONFLICT: + nfs4_handle_exception(server, -NFS4ERR_RECALLCONFLICT, + exception); + status = -ERECALLCONFLICT; + goto out; case -NFS4ERR_EXPIRED: case -NFS4ERR_BAD_STATEID: + exception->timeout = 0; spin_lock(&inode->i_lock); if (nfs4_stateid_match(&lgp->args.stateid, &lgp->args.ctx->state->stateid)) { spin_unlock(&inode->i_lock); /* If the open stateid was bad, then recover it. */ - state = lgp->args.ctx->state; + exception->state = lgp->args.ctx->state; break; } lo = NFS_I(inode)->layout; @@ -7926,25 +7934,21 @@ static void nfs4_layoutget_done(struct rpc_task *task, void *calldata) * with the current stateid. */ set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags); - pnfs_mark_matching_lsegs_invalid(lo, &head, NULL); + pnfs_mark_matching_lsegs_invalid(lo, &head, NULL, 0); spin_unlock(&inode->i_lock); pnfs_free_lseg_list(&head); } else spin_unlock(&inode->i_lock); - goto out_restart; + status = -EAGAIN; + goto out; } - if (nfs4_async_handle_error(task, server, state, &lgp->timeout) == -EAGAIN) - goto out_restart; + + status = nfs4_handle_exception(server, status, exception); + if (exception->retry) + status = -EAGAIN; out: dprintk("<-- %s\n", __func__); - return; -out_restart: - task->tk_status = 0; - rpc_restart_call_prepare(task); - return; -out_overflow: - task->tk_status = -EOVERFLOW; - goto out; + return status; } static size_t max_response_pages(struct nfs_server *server) @@ -8013,7 +8017,7 @@ static const struct rpc_call_ops nfs4_layoutget_call_ops = { }; struct pnfs_layout_segment * -nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags) +nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout, gfp_t gfp_flags) { struct inode *inode = lgp->args.inode; struct nfs_server *server = NFS_SERVER(inode); @@ -8033,6 +8037,7 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags) .flags = RPC_TASK_ASYNC, }; struct pnfs_layout_segment *lseg = NULL; + struct nfs4_exception exception = { .timeout = *timeout }; int status = 0; dprintk("--> %s\n", __func__); @@ -8046,7 +8051,6 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags) return ERR_PTR(-ENOMEM); } lgp->args.layout.pglen = max_pages * PAGE_SIZE; - lgp->args.timestamp = jiffies; lgp->res.layoutp = &lgp->args.layout; lgp->res.seq_res.sr_slot = NULL; @@ -8056,13 +8060,17 @@ nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags) if (IS_ERR(task)) return ERR_CAST(task); status = nfs4_wait_for_completion_rpc_task(task); - if (status == 0) - status = task->tk_status; + if (status == 0) { + status = nfs4_layoutget_handle_exception(task, lgp, &exception); + *timeout = exception.timeout; + } + trace_nfs4_layoutget(lgp->args.ctx, &lgp->args.range, &lgp->res.range, &lgp->res.stateid, status); + /* if layoutp->len is 0, nfs4_layoutget_prepare called rpc_exit */ if (status == 0 && lgp->res.layoutp->len) lseg = pnfs_layout_process(lgp); @@ -8118,7 +8126,8 @@ static void nfs4_layoutreturn_release(void *calldata) dprintk("--> %s\n", __func__); spin_lock(&lo->plh_inode->i_lock); - pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range); + pnfs_mark_matching_lsegs_invalid(lo, &freeme, &lrp->args.range, + be32_to_cpu(lrp->args.stateid.seqid)); pnfs_mark_layout_returned_if_empty(lo); if (lrp->res.lrs_present) pnfs_set_layout_stateid(lo, &lrp->res.stateid, true); @@ -8653,6 +8662,9 @@ nfs41_free_lock_state(struct nfs_server *server, struct nfs4_lock_state *lsp) static bool nfs41_match_stateid(const nfs4_stateid *s1, const nfs4_stateid *s2) { + if (s1->type != s2->type) + return false; + if (memcmp(s1->other, s2->other, sizeof(s1->other)) != 0) return false; @@ -8793,6 +8805,7 @@ static const struct nfs4_minor_version_ops nfs_v4_2_minor_ops = { | NFS_CAP_STATEID_NFSV41 | NFS_CAP_ATOMIC_OPEN_V1 | NFS_CAP_ALLOCATE + | NFS_CAP_COPY | NFS_CAP_DEALLOCATE | NFS_CAP_SEEK | NFS_CAP_LAYOUTSTATS diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c index d854693a15b0..5075592df145 100644 --- a/fs/nfs/nfs4state.c +++ b/fs/nfs/nfs4state.c @@ -65,7 +65,10 @@ #define OPENOWNER_POOL_SIZE 8 -const nfs4_stateid zero_stateid; +const nfs4_stateid zero_stateid = { + .data = { 0 }, + .type = NFS4_SPECIAL_STATEID_TYPE, +}; static DEFINE_MUTEX(nfs_clid_init_mutex); int nfs4_init_clientid(struct nfs_client *clp, struct rpc_cred *cred) @@ -985,15 +988,20 @@ static void nfs4_copy_open_stateid(nfs4_stateid *dst, struct nfs4_state *state) * Byte-range lock aware utility to initialize the stateid of read/write * requests. */ -int nfs4_select_rw_stateid(nfs4_stateid *dst, struct nfs4_state *state, - fmode_t fmode, const struct nfs_lockowner *lockowner) +int nfs4_select_rw_stateid(struct nfs4_state *state, + fmode_t fmode, const struct nfs_lockowner *lockowner, + nfs4_stateid *dst, struct rpc_cred **cred) { - int ret = nfs4_copy_lock_stateid(dst, state, lockowner); + int ret; + + if (cred != NULL) + *cred = NULL; + ret = nfs4_copy_lock_stateid(dst, state, lockowner); if (ret == -EIO) /* A lost lock - don't even consider delegations */ goto out; /* returns true if delegation stateid found and copied */ - if (nfs4_copy_delegation_stateid(dst, state->inode, fmode)) { + if (nfs4_copy_delegation_stateid(state->inode, fmode, dst, cred)) { ret = 0; goto out; } diff --git a/fs/nfs/nfs4trace.h b/fs/nfs/nfs4trace.h index 2c8d05dae5b1..9c150b153782 100644 --- a/fs/nfs/nfs4trace.h +++ b/fs/nfs/nfs4trace.h @@ -1520,6 +1520,8 @@ DEFINE_NFS4_INODE_EVENT(nfs4_layoutreturn_on_close); { PNFS_UPDATE_LAYOUT_FOUND_CACHED, "found cached" }, \ { PNFS_UPDATE_LAYOUT_RETURN, "layoutreturn" }, \ { PNFS_UPDATE_LAYOUT_BLOCKED, "layouts blocked" }, \ + { PNFS_UPDATE_LAYOUT_INVALID_OPEN, "invalid open" }, \ + { PNFS_UPDATE_LAYOUT_RETRY, "retrying" }, \ { PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET, "sent layoutget" }) TRACE_EVENT(pnfs_update_layout, @@ -1528,9 +1530,10 @@ TRACE_EVENT(pnfs_update_layout, u64 count, enum pnfs_iomode iomode, struct pnfs_layout_hdr *lo, + struct pnfs_layout_segment *lseg, enum pnfs_update_layout_reason reason ), - TP_ARGS(inode, pos, count, iomode, lo, reason), + TP_ARGS(inode, pos, count, iomode, lo, lseg, reason), TP_STRUCT__entry( __field(dev_t, dev) __field(u64, fileid) @@ -1540,6 +1543,7 @@ TRACE_EVENT(pnfs_update_layout, __field(enum pnfs_iomode, iomode) __field(int, layoutstateid_seq) __field(u32, layoutstateid_hash) + __field(long, lseg) __field(enum pnfs_update_layout_reason, reason) ), TP_fast_assign( @@ -1559,11 +1563,12 @@ TRACE_EVENT(pnfs_update_layout, __entry->layoutstateid_seq = 0; __entry->layoutstateid_hash = 0; } + __entry->lseg = (long)lseg; ), TP_printk( "fileid=%02x:%02x:%llu fhandle=0x%08x " "iomode=%s pos=%llu count=%llu " - "layoutstateid=%d:0x%08x (%s)", + "layoutstateid=%d:0x%08x lseg=0x%lx (%s)", MAJOR(__entry->dev), MINOR(__entry->dev), (unsigned long long)__entry->fileid, __entry->fhandle, @@ -1571,6 +1576,7 @@ TRACE_EVENT(pnfs_update_layout, (unsigned long long)__entry->pos, (unsigned long long)__entry->count, __entry->layoutstateid_seq, __entry->layoutstateid_hash, + __entry->lseg, show_pnfs_update_layout_reason(__entry->reason) ) ); diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c index 88474a4fc669..661e753fe1c9 100644 --- a/fs/nfs/nfs4xdr.c +++ b/fs/nfs/nfs4xdr.c @@ -4270,6 +4270,24 @@ static int decode_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) return decode_opaque_fixed(xdr, stateid, NFS4_STATEID_SIZE); } +static int decode_open_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_OPEN_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} + +static int decode_lock_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_LOCK_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} + +static int decode_delegation_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_DELEGATION_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} + static int decode_close(struct xdr_stream *xdr, struct nfs_closeres *res) { int status; @@ -4278,7 +4296,7 @@ static int decode_close(struct xdr_stream *xdr, struct nfs_closeres *res) if (status != -EIO) nfs_increment_open_seqid(status, res->seqid); if (!status) - status = decode_stateid(xdr, &res->stateid); + status = decode_open_stateid(xdr, &res->stateid); return status; } @@ -4937,7 +4955,7 @@ static int decode_lock(struct xdr_stream *xdr, struct nfs_lock_res *res) if (status == -EIO) goto out; if (status == 0) { - status = decode_stateid(xdr, &res->stateid); + status = decode_lock_stateid(xdr, &res->stateid); if (unlikely(status)) goto out; } else if (status == -NFS4ERR_DENIED) @@ -4966,7 +4984,7 @@ static int decode_locku(struct xdr_stream *xdr, struct nfs_locku_res *res) if (status != -EIO) nfs_increment_lock_seqid(status, res->seqid); if (status == 0) - status = decode_stateid(xdr, &res->stateid); + status = decode_lock_stateid(xdr, &res->stateid); return status; } @@ -5016,7 +5034,7 @@ static int decode_rw_delegation(struct xdr_stream *xdr, __be32 *p; int status; - status = decode_stateid(xdr, &res->delegation); + status = decode_delegation_stateid(xdr, &res->delegation); if (unlikely(status)) return status; p = xdr_inline_decode(xdr, 4); @@ -5096,7 +5114,7 @@ static int decode_open(struct xdr_stream *xdr, struct nfs_openres *res) nfs_increment_open_seqid(status, res->seqid); if (status) return status; - status = decode_stateid(xdr, &res->stateid); + status = decode_open_stateid(xdr, &res->stateid); if (unlikely(status)) return status; @@ -5136,7 +5154,7 @@ static int decode_open_confirm(struct xdr_stream *xdr, struct nfs_open_confirmre if (status != -EIO) nfs_increment_open_seqid(status, res->seqid); if (!status) - status = decode_stateid(xdr, &res->stateid); + status = decode_open_stateid(xdr, &res->stateid); return status; } @@ -5148,7 +5166,7 @@ static int decode_open_downgrade(struct xdr_stream *xdr, struct nfs_closeres *re if (status != -EIO) nfs_increment_open_seqid(status, res->seqid); if (!status) - status = decode_stateid(xdr, &res->stateid); + status = decode_open_stateid(xdr, &res->stateid); return status; } @@ -5838,6 +5856,12 @@ out_overflow: } #if defined(CONFIG_NFS_V4_1) +static int decode_layout_stateid(struct xdr_stream *xdr, nfs4_stateid *stateid) +{ + stateid->type = NFS4_LAYOUT_STATEID_TYPE; + return decode_stateid(xdr, stateid); +} + static int decode_getdeviceinfo(struct xdr_stream *xdr, struct nfs4_getdeviceinfo_res *res) { @@ -5919,7 +5943,7 @@ static int decode_layoutget(struct xdr_stream *xdr, struct rpc_rqst *req, if (unlikely(!p)) goto out_overflow; res->return_on_close = be32_to_cpup(p); - decode_stateid(xdr, &res->stateid); + decode_layout_stateid(xdr, &res->stateid); p = xdr_inline_decode(xdr, 4); if (unlikely(!p)) goto out_overflow; @@ -5985,7 +6009,7 @@ static int decode_layoutreturn(struct xdr_stream *xdr, goto out_overflow; res->lrs_present = be32_to_cpup(p); if (res->lrs_present) - status = decode_stateid(xdr, &res->stateid); + status = decode_layout_stateid(xdr, &res->stateid); return status; out_overflow: print_overflow_msg(__func__, xdr); @@ -7515,6 +7539,7 @@ struct rpc_procinfo nfs4_procedures[] = { PROC(DEALLOCATE, enc_deallocate, dec_deallocate), PROC(LAYOUTSTATS, enc_layoutstats, dec_layoutstats), PROC(CLONE, enc_clone, dec_clone), + PROC(COPY, enc_copy, dec_copy), #endif /* CONFIG_NFS_V4_2 */ }; diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c index 1f6db4231057..174dd4cf5747 100644 --- a/fs/nfs/pagelist.c +++ b/fs/nfs/pagelist.c @@ -341,8 +341,10 @@ nfs_create_request(struct nfs_open_context *ctx, struct page *page, * long write-back delay. This will be adjusted in * update_nfs_request below if the region is not locked. */ req->wb_page = page; - req->wb_index = page_file_index(page); - get_page(page); + if (page) { + req->wb_index = page_file_index(page); + get_page(page); + } req->wb_offset = offset; req->wb_pgbase = offset; req->wb_bytes = count; diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c index 89a5ef4df08a..0c7e0d45a4de 100644 --- a/fs/nfs/pnfs.c +++ b/fs/nfs/pnfs.c @@ -270,7 +270,7 @@ pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo, }; set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags); - return pnfs_mark_matching_lsegs_invalid(lo, lseg_list, &range); + return pnfs_mark_matching_lsegs_invalid(lo, lseg_list, &range, 0); } static int @@ -308,7 +308,7 @@ pnfs_layout_io_set_failed(struct pnfs_layout_hdr *lo, u32 iomode) spin_lock(&inode->i_lock); pnfs_layout_set_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode)); - pnfs_mark_matching_lsegs_invalid(lo, &head, &range); + pnfs_mark_matching_lsegs_invalid(lo, &head, &range, 0); spin_unlock(&inode->i_lock); pnfs_free_lseg_list(&head); dprintk("%s Setting layout IOMODE_%s fail bit\n", __func__, @@ -522,13 +522,35 @@ static int mark_lseg_invalid(struct pnfs_layout_segment *lseg, return rv; } -/* Returns count of number of matching invalid lsegs remaining in list - * after call. +/* + * Compare 2 layout stateid sequence ids, to see which is newer, + * taking into account wraparound issues. + */ +static bool pnfs_seqid_is_newer(u32 s1, u32 s2) +{ + return (s32)(s1 - s2) > 0; +} + +/** + * pnfs_mark_matching_lsegs_invalid - tear down lsegs or mark them for later + * @lo: layout header containing the lsegs + * @tmp_list: list head where doomed lsegs should go + * @recall_range: optional recall range argument to match (may be NULL) + * @seq: only invalidate lsegs obtained prior to this sequence (may be 0) + * + * Walk the list of lsegs in the layout header, and tear down any that should + * be destroyed. If "recall_range" is specified then the segment must match + * that range. If "seq" is non-zero, then only match segments that were handed + * out at or before that sequence. + * + * Returns number of matching invalid lsegs remaining in list after scanning + * it and purging them. */ int pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo, struct list_head *tmp_list, - const struct pnfs_layout_range *recall_range) + const struct pnfs_layout_range *recall_range, + u32 seq) { struct pnfs_layout_segment *lseg, *next; int remaining = 0; @@ -540,10 +562,12 @@ pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo, list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list) if (!recall_range || should_free_lseg(&lseg->pls_range, recall_range)) { - dprintk("%s: freeing lseg %p iomode %d " + if (seq && pnfs_seqid_is_newer(lseg->pls_seq, seq)) + continue; + dprintk("%s: freeing lseg %p iomode %d seq %u" "offset %llu length %llu\n", __func__, - lseg, lseg->pls_range.iomode, lseg->pls_range.offset, - lseg->pls_range.length); + lseg, lseg->pls_range.iomode, lseg->pls_seq, + lseg->pls_range.offset, lseg->pls_range.length); if (!mark_lseg_invalid(lseg, tmp_list)) remaining++; } @@ -730,15 +754,6 @@ pnfs_destroy_all_layouts(struct nfs_client *clp) pnfs_destroy_layouts_byclid(clp, false); } -/* - * Compare 2 layout stateid sequence ids, to see which is newer, - * taking into account wraparound issues. - */ -static bool pnfs_seqid_is_newer(u32 s1, u32 s2) -{ - return (s32)(s1 - s2) > 0; -} - /* update lo->plh_stateid with new if is more recent */ void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new, @@ -781,50 +796,22 @@ pnfs_layoutgets_blocked(const struct pnfs_layout_hdr *lo) test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags); } -int -pnfs_choose_layoutget_stateid(nfs4_stateid *dst, struct pnfs_layout_hdr *lo, - const struct pnfs_layout_range *range, - struct nfs4_state *open_state) -{ - int status = 0; - - dprintk("--> %s\n", __func__); - spin_lock(&lo->plh_inode->i_lock); - if (pnfs_layoutgets_blocked(lo)) { - status = -EAGAIN; - } else if (!nfs4_valid_open_stateid(open_state)) { - status = -EBADF; - } else if (list_empty(&lo->plh_segs) || - test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags)) { - int seq; - - do { - seq = read_seqbegin(&open_state->seqlock); - nfs4_stateid_copy(dst, &open_state->stateid); - } while (read_seqretry(&open_state->seqlock, seq)); - } else - nfs4_stateid_copy(dst, &lo->plh_stateid); - spin_unlock(&lo->plh_inode->i_lock); - dprintk("<-- %s\n", __func__); - return status; -} - /* -* Get layout from server. -* for now, assume that whole file layouts are requested. -* arg->offset: 0 -* arg->length: all ones -*/ + * Get layout from server. + * for now, assume that whole file layouts are requested. + * arg->offset: 0 + * arg->length: all ones + */ static struct pnfs_layout_segment * send_layoutget(struct pnfs_layout_hdr *lo, struct nfs_open_context *ctx, + nfs4_stateid *stateid, const struct pnfs_layout_range *range, - gfp_t gfp_flags) + long *timeout, gfp_t gfp_flags) { struct inode *ino = lo->plh_inode; struct nfs_server *server = NFS_SERVER(ino); struct nfs4_layoutget *lgp; - struct pnfs_layout_segment *lseg; loff_t i_size; dprintk("--> %s\n", __func__); @@ -834,40 +821,31 @@ send_layoutget(struct pnfs_layout_hdr *lo, * store in lseg. If we race with a concurrent seqid morphing * op, then re-send the LAYOUTGET. */ - do { - lgp = kzalloc(sizeof(*lgp), gfp_flags); - if (lgp == NULL) - return NULL; - - i_size = i_size_read(ino); - - lgp->args.minlength = PAGE_SIZE; - if (lgp->args.minlength > range->length) - lgp->args.minlength = range->length; - if (range->iomode == IOMODE_READ) { - if (range->offset >= i_size) - lgp->args.minlength = 0; - else if (i_size - range->offset < lgp->args.minlength) - lgp->args.minlength = i_size - range->offset; - } - lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE; - pnfs_copy_range(&lgp->args.range, range); - lgp->args.type = server->pnfs_curr_ld->id; - lgp->args.inode = ino; - lgp->args.ctx = get_nfs_open_context(ctx); - lgp->gfp_flags = gfp_flags; - lgp->cred = lo->plh_lc_cred; - - lseg = nfs4_proc_layoutget(lgp, gfp_flags); - } while (lseg == ERR_PTR(-EAGAIN)); - - if (IS_ERR(lseg) && !nfs_error_is_fatal(PTR_ERR(lseg))) - lseg = NULL; - else - pnfs_layout_clear_fail_bit(lo, - pnfs_iomode_to_fail_bit(range->iomode)); + lgp = kzalloc(sizeof(*lgp), gfp_flags); + if (lgp == NULL) + return ERR_PTR(-ENOMEM); - return lseg; + i_size = i_size_read(ino); + + lgp->args.minlength = PAGE_SIZE; + if (lgp->args.minlength > range->length) + lgp->args.minlength = range->length; + if (range->iomode == IOMODE_READ) { + if (range->offset >= i_size) + lgp->args.minlength = 0; + else if (i_size - range->offset < lgp->args.minlength) + lgp->args.minlength = i_size - range->offset; + } + lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE; + pnfs_copy_range(&lgp->args.range, range); + lgp->args.type = server->pnfs_curr_ld->id; + lgp->args.inode = ino; + lgp->args.ctx = get_nfs_open_context(ctx); + nfs4_stateid_copy(&lgp->args.stateid, stateid); + lgp->gfp_flags = gfp_flags; + lgp->cred = lo->plh_lc_cred; + + return nfs4_proc_layoutget(lgp, timeout, gfp_flags); } static void pnfs_clear_layoutcommit(struct inode *inode, @@ -899,6 +877,7 @@ pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo) if (test_and_set_bit(NFS_LAYOUT_RETURN, &lo->plh_flags)) return false; lo->plh_return_iomode = 0; + lo->plh_return_seq = 0; pnfs_get_layout_hdr(lo); clear_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags); return true; @@ -969,6 +948,7 @@ static void pnfs_layoutreturn_before_put_layout_hdr(struct pnfs_layout_hdr *lo) bool send; nfs4_stateid_copy(&stateid, &lo->plh_stateid); + stateid.seqid = cpu_to_be32(lo->plh_return_seq); iomode = lo->plh_return_iomode; send = pnfs_prepare_layoutreturn(lo); spin_unlock(&inode->i_lock); @@ -1012,7 +992,7 @@ _pnfs_return_layout(struct inode *ino) pnfs_get_layout_hdr(lo); empty = list_empty(&lo->plh_segs); pnfs_clear_layoutcommit(ino, &tmp_list); - pnfs_mark_matching_lsegs_invalid(lo, &tmp_list, NULL); + pnfs_mark_matching_lsegs_invalid(lo, &tmp_list, NULL, 0); if (NFS_SERVER(ino)->pnfs_curr_ld->return_range) { struct pnfs_layout_range range = { @@ -1341,23 +1321,28 @@ out_existing: /* * iomode matching rules: - * iomode lseg match - * ----- ----- ----- - * ANY READ true - * ANY RW true - * RW READ false - * RW RW true - * READ READ true - * READ RW true + * iomode lseg strict match + * iomode + * ----- ----- ------ ----- + * ANY READ N/A true + * ANY RW N/A true + * RW READ N/A false + * RW RW N/A true + * READ READ N/A true + * READ RW true false + * READ RW false true */ static bool pnfs_lseg_range_match(const struct pnfs_layout_range *ls_range, - const struct pnfs_layout_range *range) + const struct pnfs_layout_range *range, + bool strict_iomode) { struct pnfs_layout_range range1; if ((range->iomode == IOMODE_RW && ls_range->iomode != IOMODE_RW) || + (range->iomode != ls_range->iomode && + strict_iomode == true) || !pnfs_lseg_range_intersecting(ls_range, range)) return 0; @@ -1372,7 +1357,8 @@ pnfs_lseg_range_match(const struct pnfs_layout_range *ls_range, */ static struct pnfs_layout_segment * pnfs_find_lseg(struct pnfs_layout_hdr *lo, - struct pnfs_layout_range *range) + struct pnfs_layout_range *range, + bool strict_iomode) { struct pnfs_layout_segment *lseg, *ret = NULL; @@ -1381,7 +1367,8 @@ pnfs_find_lseg(struct pnfs_layout_hdr *lo, list_for_each_entry(lseg, &lo->plh_segs, pls_list) { if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags) && !test_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) && - pnfs_lseg_range_match(&lseg->pls_range, range)) { + pnfs_lseg_range_match(&lseg->pls_range, range, + strict_iomode)) { ret = pnfs_get_lseg(lseg); break; } @@ -1498,6 +1485,7 @@ pnfs_update_layout(struct inode *ino, loff_t pos, u64 count, enum pnfs_iomode iomode, + bool strict_iomode, gfp_t gfp_flags) { struct pnfs_layout_range arg = { @@ -1505,27 +1493,30 @@ pnfs_update_layout(struct inode *ino, .offset = pos, .length = count, }; - unsigned pg_offset; + unsigned pg_offset, seq; struct nfs_server *server = NFS_SERVER(ino); struct nfs_client *clp = server->nfs_client; - struct pnfs_layout_hdr *lo; + struct pnfs_layout_hdr *lo = NULL; struct pnfs_layout_segment *lseg = NULL; + nfs4_stateid stateid; + long timeout = 0; + unsigned long giveup = jiffies + rpc_get_timeout(server->client); bool first; if (!pnfs_enabled_sb(NFS_SERVER(ino))) { - trace_pnfs_update_layout(ino, pos, count, iomode, NULL, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_NO_PNFS); goto out; } if (iomode == IOMODE_READ && i_size_read(ino) == 0) { - trace_pnfs_update_layout(ino, pos, count, iomode, NULL, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RD_ZEROLEN); goto out; } if (pnfs_within_mdsthreshold(ctx, ino, iomode)) { - trace_pnfs_update_layout(ino, pos, count, iomode, NULL, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_MDSTHRESH); goto out; } @@ -1536,14 +1527,14 @@ lookup_again: lo = pnfs_find_alloc_layout(ino, ctx, gfp_flags); if (lo == NULL) { spin_unlock(&ino->i_lock); - trace_pnfs_update_layout(ino, pos, count, iomode, NULL, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_NOMEM); goto out; } /* Do we even need to bother with this? */ if (test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags)) { - trace_pnfs_update_layout(ino, pos, count, iomode, lo, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_BULK_RECALL); dprintk("%s matches recall, use MDS\n", __func__); goto out_unlock; @@ -1551,14 +1542,34 @@ lookup_again: /* if LAYOUTGET already failed once we don't try again */ if (pnfs_layout_io_test_failed(lo, iomode)) { - trace_pnfs_update_layout(ino, pos, count, iomode, lo, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_IO_TEST_FAIL); goto out_unlock; } - first = list_empty(&lo->plh_segs); - if (first) { - /* The first layoutget for the file. Need to serialize per + lseg = pnfs_find_lseg(lo, &arg, strict_iomode); + if (lseg) { + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, + PNFS_UPDATE_LAYOUT_FOUND_CACHED); + goto out_unlock; + } + + if (!nfs4_valid_open_stateid(ctx->state)) { + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, + PNFS_UPDATE_LAYOUT_INVALID_OPEN); + goto out_unlock; + } + + /* + * Choose a stateid for the LAYOUTGET. If we don't have a layout + * stateid, or it has been invalidated, then we must use the open + * stateid. + */ + if (lo->plh_stateid.seqid == 0 || + test_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags)) { + + /* + * The first layoutget for the file. Need to serialize per * RFC 5661 Errata 3208. */ if (test_and_set_bit(NFS_LAYOUT_FIRST_LAYOUTGET, @@ -1567,18 +1578,17 @@ lookup_again: wait_on_bit(&lo->plh_flags, NFS_LAYOUT_FIRST_LAYOUTGET, TASK_UNINTERRUPTIBLE); pnfs_put_layout_hdr(lo); + dprintk("%s retrying\n", __func__); goto lookup_again; } + + first = true; + do { + seq = read_seqbegin(&ctx->state->seqlock); + nfs4_stateid_copy(&stateid, &ctx->state->stateid); + } while (read_seqretry(&ctx->state->seqlock, seq)); } else { - /* Check to see if the layout for the given range - * already exists - */ - lseg = pnfs_find_lseg(lo, &arg); - if (lseg) { - trace_pnfs_update_layout(ino, pos, count, iomode, lo, - PNFS_UPDATE_LAYOUT_FOUND_CACHED); - goto out_unlock; - } + nfs4_stateid_copy(&stateid, &lo->plh_stateid); } /* @@ -1593,15 +1603,17 @@ lookup_again: pnfs_clear_first_layoutget(lo); pnfs_put_layout_hdr(lo); dprintk("%s retrying\n", __func__); + trace_pnfs_update_layout(ino, pos, count, iomode, lo, + lseg, PNFS_UPDATE_LAYOUT_RETRY); goto lookup_again; } - trace_pnfs_update_layout(ino, pos, count, iomode, lo, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RETURN); goto out_put_layout_hdr; } if (pnfs_layoutgets_blocked(lo)) { - trace_pnfs_update_layout(ino, pos, count, iomode, lo, + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_BLOCKED); goto out_unlock; } @@ -1626,10 +1638,36 @@ lookup_again: if (arg.length != NFS4_MAX_UINT64) arg.length = PAGE_ALIGN(arg.length); - lseg = send_layoutget(lo, ctx, &arg, gfp_flags); - atomic_dec(&lo->plh_outstanding); - trace_pnfs_update_layout(ino, pos, count, iomode, lo, + lseg = send_layoutget(lo, ctx, &stateid, &arg, &timeout, gfp_flags); + trace_pnfs_update_layout(ino, pos, count, iomode, lo, lseg, PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET); + if (IS_ERR(lseg)) { + switch(PTR_ERR(lseg)) { + case -ERECALLCONFLICT: + if (time_after(jiffies, giveup)) + lseg = NULL; + /* Fallthrough */ + case -EAGAIN: + pnfs_put_layout_hdr(lo); + if (first) + pnfs_clear_first_layoutget(lo); + if (lseg) { + trace_pnfs_update_layout(ino, pos, count, + iomode, lo, lseg, PNFS_UPDATE_LAYOUT_RETRY); + goto lookup_again; + } + /* Fallthrough */ + default: + if (!nfs_error_is_fatal(PTR_ERR(lseg))) { + pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode)); + lseg = NULL; + } + } + } else { + pnfs_layout_clear_fail_bit(lo, pnfs_iomode_to_fail_bit(iomode)); + } + + atomic_dec(&lo->plh_outstanding); out_put_layout_hdr: if (first) pnfs_clear_first_layoutget(lo); @@ -1678,38 +1716,36 @@ pnfs_layout_process(struct nfs4_layoutget *lgp) struct pnfs_layout_segment *lseg; struct inode *ino = lo->plh_inode; LIST_HEAD(free_me); - int status = -EINVAL; if (!pnfs_sanity_check_layout_range(&res->range)) - goto out; + return ERR_PTR(-EINVAL); /* Inject layout blob into I/O device driver */ lseg = NFS_SERVER(ino)->pnfs_curr_ld->alloc_lseg(lo, res, lgp->gfp_flags); - if (!lseg || IS_ERR(lseg)) { + if (IS_ERR_OR_NULL(lseg)) { if (!lseg) - status = -ENOMEM; - else - status = PTR_ERR(lseg); - dprintk("%s: Could not allocate layout: error %d\n", - __func__, status); - goto out; + lseg = ERR_PTR(-ENOMEM); + + dprintk("%s: Could not allocate layout: error %ld\n", + __func__, PTR_ERR(lseg)); + return lseg; } init_lseg(lo, lseg); lseg->pls_range = res->range; + lseg->pls_seq = be32_to_cpu(res->stateid.seqid); spin_lock(&ino->i_lock); if (pnfs_layoutgets_blocked(lo)) { dprintk("%s forget reply due to state\n", __func__); - goto out_forget_reply; + goto out_forget; } if (nfs4_stateid_match_other(&lo->plh_stateid, &res->stateid)) { /* existing state ID, make sure the sequence number matches. */ if (pnfs_layout_stateid_blocked(lo, &res->stateid)) { dprintk("%s forget reply due to sequence\n", __func__); - status = -EAGAIN; - goto out_forget_reply; + goto out_forget; } pnfs_set_layout_stateid(lo, &res->stateid, false); } else { @@ -1718,7 +1754,7 @@ pnfs_layout_process(struct nfs4_layoutget *lgp) * inode invalid, and don't bother validating the stateid * sequence number. */ - pnfs_mark_matching_lsegs_invalid(lo, &free_me, NULL); + pnfs_mark_matching_lsegs_invalid(lo, &free_me, NULL, 0); nfs4_stateid_copy(&lo->plh_stateid, &res->stateid); lo->plh_barrier = be32_to_cpu(res->stateid.seqid); @@ -1735,18 +1771,17 @@ pnfs_layout_process(struct nfs4_layoutget *lgp) spin_unlock(&ino->i_lock); pnfs_free_lseg_list(&free_me); return lseg; -out: - return ERR_PTR(status); -out_forget_reply: +out_forget: spin_unlock(&ino->i_lock); lseg->pls_layout = lo; NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg); - goto out; + return ERR_PTR(-EAGAIN); } static void -pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode) +pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode, + u32 seq) { if (lo->plh_return_iomode == iomode) return; @@ -1754,6 +1789,8 @@ pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode) iomode = IOMODE_ANY; lo->plh_return_iomode = iomode; set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags); + if (!lo->plh_return_seq || pnfs_seqid_is_newer(seq, lo->plh_return_seq)) + lo->plh_return_seq = seq; } /** @@ -1769,7 +1806,8 @@ pnfs_set_plh_return_iomode(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode) int pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo, struct list_head *tmp_list, - const struct pnfs_layout_range *return_range) + const struct pnfs_layout_range *return_range, + u32 seq) { struct pnfs_layout_segment *lseg, *next; int remaining = 0; @@ -1792,8 +1830,11 @@ pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo, continue; remaining++; set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags); - pnfs_set_plh_return_iomode(lo, return_range->iomode); } + + if (remaining) + pnfs_set_plh_return_info(lo, return_range->iomode, seq); + return remaining; } @@ -1810,13 +1851,14 @@ void pnfs_error_mark_layout_for_return(struct inode *inode, bool return_now = false; spin_lock(&inode->i_lock); - pnfs_set_plh_return_iomode(lo, range.iomode); + pnfs_set_plh_return_info(lo, range.iomode, lseg->pls_seq); /* * mark all matching lsegs so that we are sure to have no live * segments at hand when sending layoutreturn. See pnfs_put_lseg() * for how it works. */ - if (!pnfs_mark_matching_lsegs_return(lo, &free_me, &range)) { + if (!pnfs_mark_matching_lsegs_return(lo, &free_me, + &range, lseg->pls_seq)) { nfs4_stateid stateid; enum pnfs_iomode iomode = lo->plh_return_iomode; @@ -1849,6 +1891,7 @@ pnfs_generic_pg_init_read(struct nfs_pageio_descriptor *pgio, struct nfs_page *r req_offset(req), rd_size, IOMODE_READ, + false, GFP_KERNEL); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); @@ -1873,6 +1916,7 @@ pnfs_generic_pg_init_write(struct nfs_pageio_descriptor *pgio, req_offset(req), wb_size, IOMODE_RW, + false, GFP_NOFS); if (IS_ERR(pgio->pg_lseg)) { pgio->pg_error = PTR_ERR(pgio->pg_lseg); @@ -2143,12 +2187,15 @@ pnfs_try_to_read_data(struct nfs_pgio_header *hdr, } /* Resend all requests through pnfs. */ -int pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr) +void pnfs_read_resend_pnfs(struct nfs_pgio_header *hdr) { struct nfs_pageio_descriptor pgio; - nfs_pageio_init_read(&pgio, hdr->inode, false, hdr->completion_ops); - return nfs_pageio_resend(&pgio, hdr); + if (!test_and_set_bit(NFS_IOHDR_REDO, &hdr->flags)) { + nfs_pageio_init_read(&pgio, hdr->inode, false, + hdr->completion_ops); + hdr->task.tk_status = nfs_pageio_resend(&pgio, hdr); + } } EXPORT_SYMBOL_GPL(pnfs_read_resend_pnfs); @@ -2158,12 +2205,11 @@ pnfs_do_read(struct nfs_pageio_descriptor *desc, struct nfs_pgio_header *hdr) const struct rpc_call_ops *call_ops = desc->pg_rpc_callops; struct pnfs_layout_segment *lseg = desc->pg_lseg; enum pnfs_try_status trypnfs; - int err = 0; trypnfs = pnfs_try_to_read_data(hdr, call_ops, lseg); if (trypnfs == PNFS_TRY_AGAIN) - err = pnfs_read_resend_pnfs(hdr); - if (trypnfs == PNFS_NOT_ATTEMPTED || err) + pnfs_read_resend_pnfs(hdr); + if (trypnfs == PNFS_NOT_ATTEMPTED || hdr->task.tk_status) pnfs_read_through_mds(desc, hdr); } @@ -2405,7 +2451,7 @@ pnfs_report_layoutstat(struct inode *inode, gfp_t gfp_flags) spin_lock(&inode->i_lock); if (!NFS_I(inode)->layout) { spin_unlock(&inode->i_lock); - goto out; + goto out_clear_layoutstats; } hdr = NFS_I(inode)->layout; pnfs_get_layout_hdr(hdr); @@ -2434,6 +2480,7 @@ out_free: kfree(data); out_put: pnfs_put_layout_hdr(hdr); +out_clear_layoutstats: smp_mb__before_atomic(); clear_bit(NFS_INO_LAYOUTSTATS, &nfsi->flags); smp_mb__after_atomic(); diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h index 1ac1db5f6dad..b21bd0bee784 100644 --- a/fs/nfs/pnfs.h +++ b/fs/nfs/pnfs.h @@ -64,6 +64,7 @@ struct pnfs_layout_segment { struct list_head pls_lc_list; struct pnfs_layout_range pls_range; atomic_t pls_refcount; + u32 pls_seq; unsigned long pls_flags; struct pnfs_layout_hdr *pls_layout; struct work_struct pls_work; @@ -194,6 +195,7 @@ struct pnfs_layout_hdr { unsigned long plh_flags; nfs4_stateid plh_stateid; u32 plh_barrier; /* ignore lower seqids */ + u32 plh_return_seq; enum pnfs_iomode plh_return_iomode; loff_t plh_lwb; /* last write byte for layoutcommit */ struct rpc_cred *plh_lc_cred; /* layoutcommit cred */ @@ -226,7 +228,7 @@ extern void pnfs_unregister_layoutdriver(struct pnfs_layoutdriver_type *); extern int nfs4_proc_getdeviceinfo(struct nfs_server *server, struct pnfs_device *dev, struct rpc_cred *cred); -extern struct pnfs_layout_segment* nfs4_proc_layoutget(struct nfs4_layoutget *lgp, gfp_t gfp_flags); +extern struct pnfs_layout_segment* nfs4_proc_layoutget(struct nfs4_layoutget *lgp, long *timeout, gfp_t gfp_flags); extern int nfs4_proc_layoutreturn(struct nfs4_layoutreturn *lrp, bool sync); /* pnfs.c */ @@ -258,16 +260,14 @@ void pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo); void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo, const nfs4_stateid *new, bool update_barrier); -int pnfs_choose_layoutget_stateid(nfs4_stateid *dst, - struct pnfs_layout_hdr *lo, - const struct pnfs_layout_range *range, - struct nfs4_state *open_state); int pnfs_mark_matching_lsegs_invalid(struct pnfs_layout_hdr *lo, struct list_head *tmp_list, - const struct pnfs_layout_range *recall_range); + const struct pnfs_layout_range *recall_range, + u32 seq); int pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo, struct list_head *tmp_list, - const struct pnfs_layout_range *recall_range); + const struct pnfs_layout_range *recall_range, + u32 seq); bool pnfs_roc(struct inode *ino); void pnfs_roc_release(struct inode *ino); void pnfs_roc_set_barrier(struct inode *ino, u32 barrier); @@ -282,12 +282,13 @@ int _pnfs_return_layout(struct inode *); int pnfs_commit_and_return_layout(struct inode *); void pnfs_ld_write_done(struct nfs_pgio_header *); void pnfs_ld_read_done(struct nfs_pgio_header *); -int pnfs_read_resend_pnfs(struct nfs_pgio_header *); +void pnfs_read_resend_pnfs(struct nfs_pgio_header *); struct pnfs_layout_segment *pnfs_update_layout(struct inode *ino, struct nfs_open_context *ctx, loff_t pos, u64 count, enum pnfs_iomode iomode, + bool strict_iomode, gfp_t gfp_flags); void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo); diff --git a/fs/nfs/pnfs_nfs.c b/fs/nfs/pnfs_nfs.c index 4aaed890048f..0dfc476da3e1 100644 --- a/fs/nfs/pnfs_nfs.c +++ b/fs/nfs/pnfs_nfs.c @@ -61,7 +61,7 @@ EXPORT_SYMBOL_GPL(pnfs_generic_commit_release); /* The generic layer is about to remove the req from the commit list. * If this will make the bucket empty, it will need to put the lseg reference. - * Note this must be called holding the inode (/cinfo) lock + * Note this must be called holding i_lock */ void pnfs_generic_clear_request_commit(struct nfs_page *req, @@ -98,7 +98,7 @@ pnfs_generic_transfer_commit_list(struct list_head *src, struct list_head *dst, if (!nfs_lock_request(req)) continue; kref_get(&req->wb_kref); - if (cond_resched_lock(cinfo->lock)) + if (cond_resched_lock(&cinfo->inode->i_lock)) list_safe_reset_next(req, tmp, wb_list); nfs_request_remove_commit_list(req, cinfo); clear_bit(PG_COMMIT_TO_DS, &req->wb_flags); @@ -119,7 +119,7 @@ pnfs_generic_scan_ds_commit_list(struct pnfs_commit_bucket *bucket, struct list_head *dst = &bucket->committing; int ret; - lockdep_assert_held(cinfo->lock); + lockdep_assert_held(&cinfo->inode->i_lock); ret = pnfs_generic_transfer_commit_list(src, dst, cinfo, max); if (ret) { cinfo->ds->nwritten -= ret; @@ -142,7 +142,7 @@ int pnfs_generic_scan_commit_lists(struct nfs_commit_info *cinfo, { int i, rv = 0, cnt; - lockdep_assert_held(cinfo->lock); + lockdep_assert_held(&cinfo->inode->i_lock); for (i = 0; i < cinfo->ds->nbuckets && max != 0; i++) { cnt = pnfs_generic_scan_ds_commit_list(&cinfo->ds->buckets[i], cinfo, max); @@ -161,16 +161,16 @@ void pnfs_generic_recover_commit_reqs(struct list_head *dst, struct pnfs_layout_segment *freeme; int i; - lockdep_assert_held(cinfo->lock); + lockdep_assert_held(&cinfo->inode->i_lock); restart: for (i = 0, b = cinfo->ds->buckets; i < cinfo->ds->nbuckets; i++, b++) { if (pnfs_generic_transfer_commit_list(&b->written, dst, cinfo, 0)) { freeme = b->wlseg; b->wlseg = NULL; - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); pnfs_put_lseg(freeme); - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); goto restart; } } @@ -186,7 +186,7 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx) LIST_HEAD(pages); int i; - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); for (i = idx; i < fl_cinfo->nbuckets; i++) { bucket = &fl_cinfo->buckets[i]; if (list_empty(&bucket->committing)) @@ -194,12 +194,12 @@ static void pnfs_generic_retry_commit(struct nfs_commit_info *cinfo, int idx) freeme = bucket->clseg; bucket->clseg = NULL; list_splice_init(&bucket->committing, &pages); - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); nfs_retry_commit(&pages, freeme, cinfo, i); pnfs_put_lseg(freeme); - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); } - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); } static unsigned int @@ -238,14 +238,31 @@ void pnfs_fetch_commit_bucket_list(struct list_head *pages, struct pnfs_commit_bucket *bucket; bucket = &cinfo->ds->buckets[data->ds_commit_index]; - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); list_splice_init(&bucket->committing, pages); data->lseg = bucket->clseg; bucket->clseg = NULL; - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); } +/* Helper function for pnfs_generic_commit_pagelist to catch an empty + * page list. This can happen when two commits race. */ +static bool +pnfs_generic_commit_cancel_empty_pagelist(struct list_head *pages, + struct nfs_commit_data *data, + struct nfs_commit_info *cinfo) +{ + if (list_empty(pages)) { + if (atomic_dec_and_test(&cinfo->mds->rpcs_out)) + wake_up_atomic_t(&cinfo->mds->rpcs_out); + nfs_commitdata_release(data); + return true; + } + + return false; +} + /* This follows nfs_commit_list pretty closely */ int pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages, @@ -280,6 +297,11 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages, list_for_each_entry_safe(data, tmp, &list, pages) { list_del_init(&data->pages); if (data->ds_commit_index < 0) { + /* another commit raced with us */ + if (pnfs_generic_commit_cancel_empty_pagelist(mds_pages, + data, cinfo)) + continue; + nfs_init_commit(data, mds_pages, NULL, cinfo); nfs_initiate_commit(NFS_CLIENT(inode), data, NFS_PROTO(data->inode), @@ -288,6 +310,12 @@ pnfs_generic_commit_pagelist(struct inode *inode, struct list_head *mds_pages, LIST_HEAD(pages); pnfs_fetch_commit_bucket_list(&pages, data, cinfo); + + /* another commit raced with us */ + if (pnfs_generic_commit_cancel_empty_pagelist(&pages, + data, cinfo)) + continue; + nfs_init_commit(data, &pages, data->lseg, cinfo); initiate_commit(data, how); } @@ -874,12 +902,12 @@ pnfs_layout_mark_request_commit(struct nfs_page *req, struct list_head *list; struct pnfs_commit_bucket *buckets; - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); buckets = cinfo->ds->buckets; list = &buckets[ds_commit_idx].written; if (list_empty(list)) { if (!pnfs_is_valid_lseg(lseg)) { - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); cinfo->completion_ops->resched_write(cinfo, req); return; } @@ -896,7 +924,7 @@ pnfs_layout_mark_request_commit(struct nfs_page *req, cinfo->ds->nwritten++; nfs_request_add_commit_list_locked(req, list, cinfo); - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); nfs_mark_page_unstable(req->wb_page, cinfo); } EXPORT_SYMBOL_GPL(pnfs_layout_mark_request_commit); diff --git a/fs/nfs/super.c b/fs/nfs/super.c index f1268280244e..2137e0202f25 100644 --- a/fs/nfs/super.c +++ b/fs/nfs/super.c @@ -191,6 +191,7 @@ static const match_table_t nfs_mount_option_tokens = { enum { Opt_xprt_udp, Opt_xprt_udp6, Opt_xprt_tcp, Opt_xprt_tcp6, Opt_xprt_rdma, + Opt_xprt_rdma6, Opt_xprt_err }; @@ -201,6 +202,7 @@ static const match_table_t nfs_xprt_protocol_tokens = { { Opt_xprt_tcp, "tcp" }, { Opt_xprt_tcp6, "tcp6" }, { Opt_xprt_rdma, "rdma" }, + { Opt_xprt_rdma6, "rdma6" }, { Opt_xprt_err, NULL } }; @@ -1456,6 +1458,8 @@ static int nfs_parse_mount_options(char *raw, mnt->flags |= NFS_MOUNT_TCP; mnt->nfs_server.protocol = XPRT_TRANSPORT_TCP; break; + case Opt_xprt_rdma6: + protofamily = AF_INET6; case Opt_xprt_rdma: /* vector side protocols to TCP */ mnt->flags |= NFS_MOUNT_TCP; @@ -2408,6 +2412,11 @@ static int nfs_compare_super_address(struct nfs_server *server1, struct nfs_server *server2) { struct sockaddr *sap1, *sap2; + struct rpc_xprt *xprt1 = server1->client->cl_xprt; + struct rpc_xprt *xprt2 = server2->client->cl_xprt; + + if (!net_eq(xprt1->xprt_net, xprt2->xprt_net)) + return 0; sap1 = (struct sockaddr *)&server1->nfs_client->cl_addr; sap2 = (struct sockaddr *)&server2->nfs_client->cl_addr; diff --git a/fs/nfs/write.c b/fs/nfs/write.c index 5f4fd53e5764..e1c74d3db64d 100644 --- a/fs/nfs/write.c +++ b/fs/nfs/write.c @@ -245,8 +245,7 @@ static void nfs_mark_uptodate(struct nfs_page *req) static int wb_priority(struct writeback_control *wbc) { int ret = 0; - if (wbc->for_reclaim) - return FLUSH_HIGHPRI | FLUSH_COND_STABLE; + if (wbc->sync_mode == WB_SYNC_ALL) ret = FLUSH_COND_STABLE; return ret; @@ -737,7 +736,7 @@ static void nfs_inode_remove_request(struct nfs_page *req) head = req->wb_head; spin_lock(&inode->i_lock); - if (likely(!PageSwapCache(head->wb_page))) { + if (likely(head->wb_page && !PageSwapCache(head->wb_page))) { set_page_private(head->wb_page, 0); ClearPagePrivate(head->wb_page); smp_mb__after_atomic(); @@ -759,7 +758,8 @@ static void nfs_inode_remove_request(struct nfs_page *req) static void nfs_mark_request_dirty(struct nfs_page *req) { - __set_page_dirty_nobuffers(req->wb_page); + if (req->wb_page) + __set_page_dirty_nobuffers(req->wb_page); } /* @@ -804,7 +804,7 @@ nfs_page_search_commits_for_head_request_locked(struct nfs_inode *nfsi, * number of outstanding requests requiring a commit as well as * the MM page stats. * - * The caller must hold the cinfo->lock, and the nfs_page lock. + * The caller must hold cinfo->inode->i_lock, and the nfs_page lock. */ void nfs_request_add_commit_list_locked(struct nfs_page *req, struct list_head *dst, @@ -832,10 +832,11 @@ EXPORT_SYMBOL_GPL(nfs_request_add_commit_list_locked); void nfs_request_add_commit_list(struct nfs_page *req, struct nfs_commit_info *cinfo) { - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); nfs_request_add_commit_list_locked(req, &cinfo->mds->list, cinfo); - spin_unlock(cinfo->lock); - nfs_mark_page_unstable(req->wb_page, cinfo); + spin_unlock(&cinfo->inode->i_lock); + if (req->wb_page) + nfs_mark_page_unstable(req->wb_page, cinfo); } EXPORT_SYMBOL_GPL(nfs_request_add_commit_list); @@ -864,7 +865,7 @@ EXPORT_SYMBOL_GPL(nfs_request_remove_commit_list); static void nfs_init_cinfo_from_inode(struct nfs_commit_info *cinfo, struct inode *inode) { - cinfo->lock = &inode->i_lock; + cinfo->inode = inode; cinfo->mds = &NFS_I(inode)->commit_info; cinfo->ds = pnfs_get_ds_info(inode); cinfo->dreq = NULL; @@ -967,7 +968,7 @@ nfs_reqs_to_commit(struct nfs_commit_info *cinfo) return cinfo->mds->ncommit; } -/* cinfo->lock held by caller */ +/* cinfo->inode->i_lock held by caller */ int nfs_scan_commit_list(struct list_head *src, struct list_head *dst, struct nfs_commit_info *cinfo, int max) @@ -979,7 +980,7 @@ nfs_scan_commit_list(struct list_head *src, struct list_head *dst, if (!nfs_lock_request(req)) continue; kref_get(&req->wb_kref); - if (cond_resched_lock(cinfo->lock)) + if (cond_resched_lock(&cinfo->inode->i_lock)) list_safe_reset_next(req, tmp, wb_list); nfs_request_remove_commit_list(req, cinfo); nfs_list_add_request(req, dst); @@ -1005,7 +1006,7 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst, { int ret = 0; - spin_lock(cinfo->lock); + spin_lock(&cinfo->inode->i_lock); if (cinfo->mds->ncommit > 0) { const int max = INT_MAX; @@ -1013,7 +1014,7 @@ nfs_scan_commit(struct inode *inode, struct list_head *dst, cinfo, max); ret += pnfs_scan_commit_lists(inode, cinfo, max - ret); } - spin_unlock(cinfo->lock); + spin_unlock(&cinfo->inode->i_lock); return ret; } @@ -1709,6 +1710,10 @@ nfs_commit_list(struct inode *inode, struct list_head *head, int how, { struct nfs_commit_data *data; + /* another commit raced with us */ + if (list_empty(head)) + return 0; + data = nfs_commitdata_alloc(); if (!data) @@ -1724,6 +1729,36 @@ nfs_commit_list(struct inode *inode, struct list_head *head, int how, return -ENOMEM; } +int nfs_commit_file(struct file *file, struct nfs_write_verifier *verf) +{ + struct inode *inode = file_inode(file); + struct nfs_open_context *open; + struct nfs_commit_info cinfo; + struct nfs_page *req; + int ret; + + open = get_nfs_open_context(nfs_file_open_context(file)); + req = nfs_create_request(open, NULL, NULL, 0, i_size_read(inode)); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + goto out_put; + } + + nfs_init_cinfo_from_inode(&cinfo, inode); + + memcpy(&req->wb_verf, verf, sizeof(struct nfs_write_verifier)); + nfs_request_add_commit_list(req, &cinfo); + ret = nfs_commit_inode(inode, FLUSH_SYNC); + if (ret > 0) + ret = 0; + + nfs_free_request(req); +out_put: + put_nfs_open_context(open); + return ret; +} +EXPORT_SYMBOL_GPL(nfs_commit_file); + /* * COMMIT call returned */ @@ -1748,7 +1783,8 @@ static void nfs_commit_release_pages(struct nfs_commit_data *data) while (!list_empty(&data->pages)) { req = nfs_list_entry(data->pages.next); nfs_list_remove_request(req); - nfs_clear_page_commit(req->wb_page); + if (req->wb_page) + nfs_clear_page_commit(req->wb_page); dprintk("NFS: commit (%s/%llu %d@%lld)", req->wb_context->dentry->d_sb->s_id, diff --git a/include/linux/errno.h b/include/linux/errno.h index 89627b9187f9..7ce9fb1b7d28 100644 --- a/include/linux/errno.h +++ b/include/linux/errno.h @@ -28,5 +28,6 @@ #define EBADTYPE 527 /* Type not supported by server */ #define EJUKEBOX 528 /* Request initiated, but will not complete before timeout */ #define EIOCBQUEUED 529 /* iocb queued, will get completion event */ +#define ERECALLCONFLICT 530 /* conflict with recalled state */ #endif diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h index 011433478a14..bfed6b367350 100644 --- a/include/linux/nfs4.h +++ b/include/linux/nfs4.h @@ -50,12 +50,27 @@ struct nfs4_label { typedef struct { char data[NFS4_VERIFIER_SIZE]; } nfs4_verifier; -struct nfs_stateid4 { - __be32 seqid; - char other[NFS4_STATEID_OTHER_SIZE]; -} __attribute__ ((packed)); +struct nfs4_stateid_struct { + union { + char data[NFS4_STATEID_SIZE]; + struct { + __be32 seqid; + char other[NFS4_STATEID_OTHER_SIZE]; + } __attribute__ ((packed)); + }; + + enum { + NFS4_INVALID_STATEID_TYPE = 0, + NFS4_SPECIAL_STATEID_TYPE, + NFS4_OPEN_STATEID_TYPE, + NFS4_LOCK_STATEID_TYPE, + NFS4_DELEGATION_STATEID_TYPE, + NFS4_LAYOUT_STATEID_TYPE, + NFS4_PNFS_DS_STATEID_TYPE, + } type; +}; -typedef struct nfs_stateid4 nfs4_stateid; +typedef struct nfs4_stateid_struct nfs4_stateid; enum nfs_opnum4 { OP_ACCESS = 3, @@ -504,6 +519,7 @@ enum { NFSPROC4_CLNT_DEALLOCATE, NFSPROC4_CLNT_LAYOUTSTATS, NFSPROC4_CLNT_CLONE, + NFSPROC4_CLNT_COPY, }; /* nfs41 types */ @@ -621,7 +637,9 @@ enum pnfs_update_layout_reason { PNFS_UPDATE_LAYOUT_IO_TEST_FAIL, PNFS_UPDATE_LAYOUT_FOUND_CACHED, PNFS_UPDATE_LAYOUT_RETURN, + PNFS_UPDATE_LAYOUT_RETRY, PNFS_UPDATE_LAYOUT_BLOCKED, + PNFS_UPDATE_LAYOUT_INVALID_OPEN, PNFS_UPDATE_LAYOUT_SEND_LAYOUTGET, }; diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h index 7fcc13c8cf1f..14a762d2734d 100644 --- a/include/linux/nfs_fs_sb.h +++ b/include/linux/nfs_fs_sb.h @@ -246,5 +246,6 @@ struct nfs_server { #define NFS_CAP_DEALLOCATE (1U << 21) #define NFS_CAP_LAYOUTSTATS (1U << 22) #define NFS_CAP_CLONE (1U << 23) +#define NFS_CAP_COPY (1U << 24) #endif diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h index ee8491dadbf3..c304a11b5b1a 100644 --- a/include/linux/nfs_xdr.h +++ b/include/linux/nfs_xdr.h @@ -233,7 +233,6 @@ struct nfs4_layoutget_args { struct inode *inode; struct nfs_open_context *ctx; nfs4_stateid stateid; - unsigned long timestamp; struct nfs4_layoutdriver_data layout; }; @@ -251,7 +250,6 @@ struct nfs4_layoutget { struct nfs4_layoutget_res res; struct rpc_cred *cred; gfp_t gfp_flags; - long timeout; }; struct nfs4_getdeviceinfo_args { @@ -1343,6 +1341,32 @@ struct nfs42_falloc_res { const struct nfs_server *falloc_server; }; +struct nfs42_copy_args { + struct nfs4_sequence_args seq_args; + + struct nfs_fh *src_fh; + nfs4_stateid src_stateid; + u64 src_pos; + + struct nfs_fh *dst_fh; + nfs4_stateid dst_stateid; + u64 dst_pos; + + u64 count; +}; + +struct nfs42_write_res { + u64 count; + struct nfs_writeverf verifier; +}; + +struct nfs42_copy_res { + struct nfs4_sequence_res seq_res; + struct nfs42_write_res write_res; + bool consecutive; + bool synchronous; +}; + struct nfs42_seek_args { struct nfs4_sequence_args seq_args; @@ -1431,7 +1455,7 @@ struct nfs_commit_completion_ops { }; struct nfs_commit_info { - spinlock_t *lock; /* inode->i_lock */ + struct inode *inode; /* Needed for inode->i_lock */ struct nfs_mds_commit_info *mds; struct pnfs_ds_commit_info *ds; struct nfs_direct_req *dreq; /* O_DIRECT request */ diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h index 6a241a277249..899791573a40 100644 --- a/include/linux/sunrpc/auth.h +++ b/include/linux/sunrpc/auth.h @@ -127,7 +127,7 @@ struct rpc_authops { void (*destroy)(struct rpc_auth *); struct rpc_cred * (*lookup_cred)(struct rpc_auth *, struct auth_cred *, int); - struct rpc_cred * (*crcreate)(struct rpc_auth*, struct auth_cred *, int); + struct rpc_cred * (*crcreate)(struct rpc_auth*, struct auth_cred *, int, gfp_t); int (*list_pseudoflavors)(rpc_authflavor_t *, int); rpc_authflavor_t (*info2flavor)(struct rpcsec_gss_info *); int (*flavor2info)(rpc_authflavor_t, @@ -167,6 +167,7 @@ void rpc_destroy_authunix(void); struct rpc_cred * rpc_lookup_cred(void); struct rpc_cred * rpc_lookup_cred_nonblock(void); +struct rpc_cred * rpc_lookup_generic_cred(struct auth_cred *, int, gfp_t); struct rpc_cred * rpc_lookup_machine_cred(const char *service_name); int rpcauth_register(const struct rpc_authops *); int rpcauth_unregister(const struct rpc_authops *); @@ -178,7 +179,7 @@ rpc_authflavor_t rpcauth_get_pseudoflavor(rpc_authflavor_t, int rpcauth_get_gssinfo(rpc_authflavor_t, struct rpcsec_gss_info *); int rpcauth_list_flavors(rpc_authflavor_t *, int); -struct rpc_cred * rpcauth_lookup_credcache(struct rpc_auth *, struct auth_cred *, int); +struct rpc_cred * rpcauth_lookup_credcache(struct rpc_auth *, struct auth_cred *, int, gfp_t); void rpcauth_init_cred(struct rpc_cred *, const struct auth_cred *, struct rpc_auth *, const struct rpc_credops *); struct rpc_cred * rpcauth_lookupcred(struct rpc_auth *, int); struct rpc_cred * rpcauth_generic_bind_cred(struct rpc_task *, struct rpc_cred *, int); @@ -201,9 +202,28 @@ char * rpcauth_stringify_acceptor(struct rpc_cred *); static inline struct rpc_cred * get_rpccred(struct rpc_cred *cred) { - atomic_inc(&cred->cr_count); + if (cred != NULL) + atomic_inc(&cred->cr_count); return cred; } +/** + * get_rpccred_rcu - get a reference to a cred using rcu-protected pointer + * @cred: cred of which to take a reference + * + * In some cases, we may have a pointer to a credential to which we + * want to take a reference, but don't already have one. Because these + * objects are freed using RCU, we can access the cr_count while its + * on its way to destruction and only take a reference if it's not already + * zero. + */ +static inline struct rpc_cred * +get_rpccred_rcu(struct rpc_cred *cred) +{ + if (atomic_inc_not_zero(&cred->cr_count)) + return cred; + return NULL; +} + #endif /* __KERNEL__ */ #endif /* _LINUX_SUNRPC_AUTH_H */ diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h index 9a7ddbaf116e..19c659d1c0f8 100644 --- a/include/linux/sunrpc/clnt.h +++ b/include/linux/sunrpc/clnt.h @@ -176,6 +176,7 @@ void rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int); int rpc_protocol(struct rpc_clnt *); struct net * rpc_net_ns(struct rpc_clnt *); size_t rpc_max_payload(struct rpc_clnt *); +size_t rpc_max_bc_payload(struct rpc_clnt *); unsigned long rpc_get_timeout(struct rpc_clnt *clnt); void rpc_force_rebind(struct rpc_clnt *); size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t); diff --git a/include/linux/sunrpc/msg_prot.h b/include/linux/sunrpc/msg_prot.h index 807371357160..59cbf16eaeb5 100644 --- a/include/linux/sunrpc/msg_prot.h +++ b/include/linux/sunrpc/msg_prot.h @@ -158,9 +158,9 @@ typedef __be32 rpc_fraghdr; /* * Note that RFC 1833 does not put any size restrictions on the - * netid string, but all currently defined netid's fit in 4 bytes. + * netid string, but all currently defined netid's fit in 5 bytes. */ -#define RPCBIND_MAXNETIDLEN (4u) +#define RPCBIND_MAXNETIDLEN (5u) /* * Universal addresses are introduced in RFC 1833 and further spelled diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h index fb0d212e0d3a..5aa3834619a8 100644 --- a/include/linux/sunrpc/xprt.h +++ b/include/linux/sunrpc/xprt.h @@ -142,6 +142,7 @@ struct rpc_xprt_ops { int (*bc_setup)(struct rpc_xprt *xprt, unsigned int min_reqs); int (*bc_up)(struct svc_serv *serv, struct net *net); + size_t (*bc_maxpayload)(struct rpc_xprt *xprt); void (*bc_free_rqst)(struct rpc_rqst *rqst); void (*bc_destroy)(struct rpc_xprt *xprt, unsigned int max_reqs); diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h index 767190b01363..39267dc3486a 100644 --- a/include/linux/sunrpc/xprtrdma.h +++ b/include/linux/sunrpc/xprtrdma.h @@ -52,7 +52,9 @@ #define RPCRDMA_DEF_SLOT_TABLE (128U) #define RPCRDMA_MAX_SLOT_TABLE (256U) -#define RPCRDMA_DEF_INLINE (1024) /* default inline max */ +#define RPCRDMA_MIN_INLINE (1024) /* min inline thresh */ +#define RPCRDMA_DEF_INLINE (1024) /* default inline thresh */ +#define RPCRDMA_MAX_INLINE (3068) /* max inline thresh */ /* Memory registration strategies, by number. * This is part of a kernel / user space API. Do not remove. */ diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 02f53674dc39..040ff627c18a 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void) */ struct rpc_cred * rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, - int flags) + int flags, gfp_t gfp) { LIST_HEAD(free); struct rpc_cred_cache *cache = auth->au_credcache; @@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, if (flags & RPCAUTH_LOOKUP_RCU) return ERR_PTR(-ECHILD); - new = auth->au_ops->crcreate(auth, acred, flags); + new = auth->au_ops->crcreate(auth, acred, flags, gfp); if (IS_ERR(new)) { cred = new; goto out; @@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) new = rpcauth_bind_new_cred(task, lookupflags); if (IS_ERR(new)) return PTR_ERR(new); - if (req->rq_cred != NULL) - put_rpccred(req->rq_cred); + put_rpccred(req->rq_cred); req->rq_cred = new; return 0; } @@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) void put_rpccred(struct rpc_cred *cred) { + if (cred == NULL) + return; /* Fast path for unhashed credentials */ if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) { if (atomic_dec_and_test(&cred->cr_count)) diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 41248b1820c7..54dd3fdead54 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void) } EXPORT_SYMBOL_GPL(rpc_lookup_cred); +struct rpc_cred * +rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp) +{ + return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp); +} +EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred); + struct rpc_cred *rpc_lookup_cred_nonblock(void) { return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU); @@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task, static struct rpc_cred * generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(&generic_auth, acred, flags); + return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL); } static struct rpc_cred * -generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct generic_cred *gcred; - gcred = kmalloc(sizeof(*gcred), GFP_KERNEL); + gcred = kmalloc(sizeof(*gcred), gfp); if (gcred == NULL) return ERR_PTR(-ENOMEM); diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 15612ffa8d57..e64ae93d5b4f 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred) static struct rpc_cred * gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth); struct gss_cred *cred = NULL; @@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) __func__, from_kuid(&init_user_ns, acred->uid), auth->au_flavor); - if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kzalloc(sizeof(*cred), gfp))) goto out_err; rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops); diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 0d3dd364c22f..9f65452b7cbc 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth) static struct rpc_cred * unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct unx_cred *cred; unsigned int groups = 0; @@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) from_kuid(&init_user_ns, acred->uid), from_kgid(&init_user_ns, acred->gid)); - if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kmalloc(sizeof(*cred), gfp))) return ERR_PTR(-ENOMEM); rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 7e0c9bf22df8..06b4df9faaa1 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1414,6 +1414,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt) EXPORT_SYMBOL_GPL(rpc_max_payload); /** + * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes + * @clnt: RPC client to query + */ +size_t rpc_max_bc_payload(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + size_t ret; + + rcu_read_lock(); + xprt = rcu_dereference(clnt->cl_xprt); + ret = xprt->ops->bc_maxpayload(xprt); + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_max_bc_payload); + +/** * rpc_get_timeout - Get timeout for transport in units of HZ * @clnt: RPC client to query */ diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 6bdb3865212d..c4f3cc0c0775 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p) xdr_set_iov(xdr, buf->head, buf->len); else if (buf->page_len != 0) xdr_set_page_base(xdr, 0, buf->len); + else + xdr_set_iov(xdr, buf->head, buf->len); if (p != NULL && p > xdr->p && xdr->end >= p) { xdr->nwords -= p - xdr->p; xdr->p = p; diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 2dcd7640eeb5..87762d976b63 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) } /** + * xprt_rdma_bc_maxpayload - Return maximum backchannel message size + * @xprt: transport + * + * Returns maximum size, in bytes, of a backchannel message + */ +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + size_t maxmsg; + + maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); + return maxmsg - RPCRDMA_HDRLEN_MIN; +} + +/** * rpcrdma_bc_marshal_reply - Send backwards direction reply * @rqst: buffer containing RPC reply data * diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index b289e106540b..6326ebe8b595 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -35,10 +35,71 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) +static struct workqueue_struct *fmr_recovery_wq; + +#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) + +int +fmr_alloc_recovery_wq(void) +{ + fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); + return !fmr_recovery_wq ? -ENOMEM : 0; +} + +void +fmr_destroy_recovery_wq(void) +{ + struct workqueue_struct *wq; + + if (!fmr_recovery_wq) + return; + + wq = fmr_recovery_wq; + fmr_recovery_wq = NULL; + destroy_workqueue(wq); +} + +static int +__fmr_unmap(struct rpcrdma_mw *mw) +{ + LIST_HEAD(l); + + list_add(&mw->fmr.fmr->list, &l); + return ib_unmap_fmr(&l); +} + +/* Deferred reset of a single FMR. Generate a fresh rkey by + * replacing the MR. There's no recovery if this fails. + */ +static void +__fmr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, + mw_work); + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + + __fmr_unmap(mw); + rpcrdma_put_mw(r_xprt, mw); + return; +} + +/* A broken MR was discovered in a context that can't sleep. + * Defer recovery to the recovery worker. + */ +static void +__fmr_queue_recovery(struct rpcrdma_mw *mw) +{ + INIT_WORK(&mw->mw_work, __fmr_recovery_worker); + queue_work(fmr_recovery_wq, &mw->mw_work); +} + static int fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + RPCRDMA_MAX_FMR_SGES)); return 0; } @@ -48,7 +109,7 @@ static size_t fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES); + RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); } static int @@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); } @@ -104,15 +166,6 @@ out: return rc; } -static int -__fmr_unmap(struct rpcrdma_mw *r) -{ - LIST_HEAD(l); - - list_add(&r->fmr.fmr->list, &l); - return ib_unmap_fmr(&l); -} - /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ @@ -183,15 +236,10 @@ static void __fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; int nsegs = seg->mr_nsegs; - seg->rl_mw = NULL; - while (nsegs--) rpcrdma_unmap_one(device, seg++); - - rpcrdma_put_mw(r_xprt, mw); } /* Invalidate all memory regions that were registered for "req". @@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, seg->rl_mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; + seg->rl_mw = NULL; } req->rl_nchunks = 0; } -/* Use the ib_unmap_fmr() verb to prevent further remote - * access via RDMA READ or RDMA WRITE. +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * In the asynchronous case, DMA unmapping occurs first here + * because the rpcrdma_mr_seg is released immediately after this + * call. It's contents won't be available in __fmr_dma_unmap later. + * FIXME. */ -static int -fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) +static void +fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->rl_mw; - int rc, nsegs = seg->mr_nsegs; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - dprintk("RPC: %s: FMR %p\n", __func__, mw); + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; - seg1->rl_mw = NULL; - while (seg1->mr_nsegs--) - rpcrdma_unmap_one(ia->ri_device, seg++); - rc = __fmr_unmap(mw); - if (rc) - goto out_err; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + if (sync) { + /* ORDER */ + __fmr_unmap(mw); + __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, mw); + } else { + __fmr_dma_unmap(r_xprt, seg); + __fmr_queue_recovery(mw); + } -out_err: - /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy - * will attempt to release it when the transport is destroyed. - */ - dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc); - return nsegs; + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, - .ro_unmap = fmr_op_unmap, + .ro_unmap_safe = fmr_op_unmap_safe, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, .ro_init = fmr_op_init, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 94c3fa910b85..c0947544babe 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void) destroy_workqueue(wq); } +static int +__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +{ + struct rpcrdma_frmr *f = &r->frmr; + int rc; + + rc = ib_dereg_mr(f->fr_mr); + if (rc) { + pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", + rc, r); + return rc; + } + + f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, + ia->ri_max_frmr_depth); + if (IS_ERR(f->fr_mr)) { + pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", + PTR_ERR(f->fr_mr), r); + return PTR_ERR(f->fr_mr); + } + + dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); + f->fr_state = FRMR_IS_INVALID; + return 0; +} + +static void +__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_frmr *f = &mw->frmr; + int rc; + + rc = __frwr_reset_mr(ia, mw); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir); + if (rc) + return; + + rpcrdma_put_mw(r_xprt, mw); +} + /* Deferred reset of a single FRMR. Generate a fresh rkey by * replacing the MR. * @@ -109,26 +150,10 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - - if (ib_dereg_mr(r->frmr.fr_mr)) - goto out_fail; + mw_work); - r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->frmr.fr_mr)) - goto out_fail; - - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->frmr.fr_state = FRMR_IS_INVALID; - rpcrdma_put_mw(r_xprt, r); + __frwr_reset_and_unmap(r->mw_xprt, r); return; - -out_fail: - pr_warn("RPC: %s: FRMR %p unrecovered\n", - __func__, r); } /* A broken MR was discovered in a context that can't sleep. @@ -137,8 +162,8 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->frmr.fr_work); + INIT_WORK(&r->mw_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->mw_work); } static int @@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, if (IS_ERR(f->fr_mr)) goto out_mr_err; - f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); - if (!f->sg) + f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL); + if (!f->fr_sg) goto out_list_err; - sg_init_table(f->sg, depth); + sg_init_table(f->fr_sg, depth); init_completion(&f->fr_linv_done); @@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r) if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->frmr.sg); + kfree(r->frmr.fr_sg); } static int @@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, depth; } + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + ia->ri_max_frmr_depth)); return 0; } @@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); + RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth); } static void @@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) return rc; } + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->frmr.fr_xprt = r_xprt; } return 0; @@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&frmr->sg[i], + sg_set_page(&frmr->fr_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&frmr->sg[i], seg->mr_offset, + sg_set_buf(&frmr->fr_sg[i], seg->mr_offset, seg->mr_len); ++seg; @@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - frmr->sg_nents = i; + frmr->fr_nents = i; + frmr->fr_dir = direction; - dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); + dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction); if (!dma_nents) { pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", - __func__, frmr->sg, frmr->sg_nents); + __func__, frmr->fr_sg, frmr->fr_nents); return -ENOMEM; } - n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); - if (unlikely(n != frmr->sg_nents)) { + n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE); + if (unlikely(n != frmr->fr_nents)) { pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", - __func__, frmr->fr_mr, n, frmr->sg_nents); + __func__, frmr->fr_mr, n, frmr->fr_nents); rc = n < 0 ? n : -EINVAL; goto out_senderr; } dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", - __func__, mw, frmr->sg_nents, mr->length); + __func__, mw, frmr->fr_nents, mr->length); key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); @@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - seg1->mr_dir = direction; seg1->rl_mw = mw; seg1->mr_rkey = mr->rkey; seg1->mr_base = mr->iova; - seg1->mr_nsegs = frmr->sg_nents; + seg1->mr_nsegs = frmr->fr_nents; seg1->mr_len = mr->length; - return frmr->sg_nents; + return frmr->fr_nents; out_senderr: dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction); __frwr_queue_recovery(mw); return rc; } @@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) return invalidate_wr; } -static void -__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int rc) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->frmr; - - seg->rl_mw = NULL; - - ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir); - - if (!rc) - rpcrdma_put_mw(r_xprt, mw); - else - __frwr_queue_recovery(mw); -} - /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the @@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct rpcrdma_mr_seg *seg; unsigned int i, nchunks; struct rpcrdma_frmr *f; + struct rpcrdma_mw *mw; int rc; dprintk("RPC: %s: req %p\n", __func__, req); @@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) { - pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - rdma_disconnect(ia->ri_id); - goto unmap; - } + if (rc) + goto reset_mrs; wait_for_completion(&f->fr_linv_done); @@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; + mw = seg->rl_mw; + seg->rl_mw = NULL; - __frwr_dma_unmap(r_xprt, seg, rc); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, + f->fr_dir); + rpcrdma_put_mw(r_xprt, mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; } req->rl_nchunks = 0; -} + return; -/* Post a LOCAL_INV Work Request to prevent further remote access - * via RDMA READ or RDMA WRITE. - */ -static int -frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->frmr; - struct ib_send_wr *invalidate_wr, *bad_wr; - int rc, nsegs = seg->mr_nsegs; +reset_mrs: + pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - dprintk("RPC: %s: FRMR %p\n", __func__, mw); + /* Find and reset the MRs in the LOCAL_INV WRs that did not + * get posted. This is synchronous, and slow. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; + f = &mw->frmr; - seg1->rl_mw = NULL; - frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->frmr.fr_invwr; + if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { + __frwr_reset_mr(ia, mw); + bad_wr = bad_wr->next; + } - memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - frmr->fr_cqe.done = frwr_wc_localinv; - invalidate_wr->wr_cqe = &frmr->fr_cqe; - invalidate_wr->opcode = IB_WR_LOCAL_INV; - invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; - DECR_CQCOUNT(&r_xprt->rx_ep); + i += seg->mr_nsegs; + } + goto unmap; +} - ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); - read_lock(&ia->ri_qplock); - rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); - read_unlock(&ia->ri_qplock); - if (rc) - goto out_err; +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + */ +static void +frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; -out_err: - dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - __frwr_queue_recovery(mw); - return nsegs; + if (sync) + __frwr_reset_and_unmap(r_xprt, mw); + else + __frwr_queue_recovery(mw); + + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, - .ro_unmap = frwr_op_unmap, + .ro_unmap_safe = frwr_op_unmap_safe, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, .ro_init = frwr_op_init, diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index 481b9b6f4a15..3750596cc432 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, __func__, PTR_ERR(mr)); return -ENOMEM; } - ia->ri_dma_mr = mr; + + rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int, + RPCRDMA_MAX_DATA_SEGS, + RPCRDMA_MAX_HDR_SEGS)); return 0; } @@ -47,7 +50,7 @@ static size_t physical_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt)); + RPCRDMA_MAX_HDR_SEGS); } static int @@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, return 1; } -/* Unmap a memory region, but leave it registered. - */ -static int -physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - - rpcrdma_unmap_one(ia->ri_device, seg); - return 1; -} - /* DMA unmap all memory regions that were mapped for "req". */ static void @@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) rpcrdma_unmap_one(device, &req->rl_segments[i++]); } +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * For physical memory registration, there is no good way to + * fence a single MR that has been advertised to the server. The + * client has already handed the server an R_key that cannot be + * invalidated and is shared by all MRs on this connection. + * Tearing down the PD might be the only safe choice, but it's + * not clear that a freshly acquired DMA R_key would be different + * than the one used by the PD that was just destroyed. + * FIXME. + */ +static void +physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + physical_op_unmap_sync(r_xprt, req); +} + static void physical_op_destroy(struct rpcrdma_buffer *buf) { @@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { .ro_map = physical_op_map, .ro_unmap_sync = physical_op_unmap_sync, - .ro_unmap = physical_op_unmap, + .ro_unmap_safe = physical_op_unmap_safe, .ro_open = physical_op_open, .ro_maxpages = physical_op_maxpages, .ro_init = physical_op_init, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 888823bb6dae..35a81096e83d 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -61,26 +61,84 @@ enum rpcrdma_chunktype { rpcrdma_replych }; -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static const char transfertypes[][12] = { - "pure inline", /* no chunks */ - " read chunk", /* some argument via rdma read */ - "*read chunk", /* entire request via rdma read */ - "write chunk", /* some result via rdma write */ + "inline", /* no chunks */ + "read list", /* some argument via rdma read */ + "*read list", /* entire request via rdma read */ + "write list", /* some result via rdma write */ "reply chunk" /* entire reply via rdma write */ }; -#endif + +/* Returns size of largest RPC-over-RDMA header in a Call message + * + * The largest Call header contains a full-size Read list and a + * minimal Reply chunk. + */ +static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Read list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = maxsegs * sizeof(struct rpcrdma_read_chunk); + + /* Minimal Read chunk size */ + size += sizeof(__be32); /* segment count */ + size += sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max call header size = %u\n", + __func__, size); + return size; +} + +/* Returns size of largest RPC-over-RDMA header in a Reply message + * + * There is only one Write list or one Reply chunk per Reply + * message. The larger list is the Write list. + */ +static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Write list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = sizeof(__be32); /* segment count */ + size += maxsegs * sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max reply header size = %u\n", + __func__, size); + return size; +} + +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, + struct rpcrdma_create_data_internal *cdata, + unsigned int maxsegs) +{ + ia->ri_max_inline_write = cdata->inline_wsize - + rpcrdma_max_call_header_size(maxsegs); + ia->ri_max_inline_read = cdata->inline_rsize - + rpcrdma_max_reply_header_size(maxsegs); +} /* The client can send a request inline as long as the RPCRDMA header * plus the RPC call fit under the transport's inline limit. If the * combined call message size exceeds that limit, the client must use * the read chunk list for this operation. */ -static bool rpcrdma_args_inline(struct rpc_rqst *rqst) +static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + return rqst->rq_snd_buf.len <= ia->ri_max_inline_write; } /* The client can't know how large the actual reply will be. Thus it @@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst) * limit, the client must provide a write list or a reply chunk for * this request. */ -static bool rpcrdma_results_inline(struct rpc_rqst *rqst) +static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); + return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } static int @@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, return n; } -/* - * Create read/write chunk lists, and reply chunks, for RDMA - * - * Assume check against THRESHOLD has been done, and chunks are required. - * Assume only encoding one list entry for read|write chunks. The NFSv3 - * protocol is simple enough to allow this as it only has a single "bulk - * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The - * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) - * - * When used for a single reply chunk (which is a special write - * chunk used for the entire reply, rather than just the data), it - * is used primarily for READDIR and READLINK which would otherwise - * be severely size-limited by a small rdma inline read max. The server - * response will come back as an RDMA Write, followed by a message - * of type RDMA_NOMSG carrying the xid and length. As a result, reply - * chunks do not provide data alignment, however they do not require - * "fixup" (moving the response to the upper layer buffer) either. +static inline __be32 * +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) +{ + *iptr++ = cpu_to_be32(seg->mr_rkey); + *iptr++ = cpu_to_be32(seg->mr_len); + return xdr_encode_hyper(iptr, seg->mr_base); +} + +/* XDR-encode the Read list. Supports encoding a list of read + * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Read list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype rtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + unsigned int pos; + int n, nsegs; + + if (rtype == rpcrdma_noch) { + *iptr++ = xdr_zero; /* item not present */ + return iptr; + } + + pos = rqst->rq_snd_buf.head[0].iov_len; + if (rtype == rpcrdma_areadch) + pos = 0; + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); + if (n <= 0) + return ERR_PTR(n); + + *iptr++ = xdr_one; /* item present */ + + /* All read segments in this chunk + * have the same "position". + */ + *iptr++ = cpu_to_be32(pos); + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: read segment pos %u " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, pos, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.read_chunk_count++; + req->rl_nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Finish Read list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Write list. Supports encoding a list containing + * one array of plain segments that belong to a single write chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Write chunklist (a list of (one) counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Write list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, __be32 *iptr, + enum rpcrdma_chunktype wtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; + + if (wtype != rpcrdma_writech) { + *iptr++ = xdr_zero; /* no Write list present */ + return iptr; + } + + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, + rqst->rq_rcv_buf.head[0].iov_len, + wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + *iptr++ = xdr_one; /* Write list present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); + if (n <= 0) + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: write segment " + "%d@0x016%llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.write_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; + nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Update count of segments in this Write chunk */ + *segcount = cpu_to_be32(nchunks); + + /* Finish Write list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Reply chunk. Supports encoding an array of plain + * segments that belong to a single write (reply) chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * - * Returns positive RPC/RDMA header size, or negative errno. + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Reply chunk, or an error pointer. */ - -static ssize_t -rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, - struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) +static __be32 * +rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int n, nsegs, nchunks = 0; - unsigned int pos; - struct rpcrdma_mr_seg *seg = req->rl_segments; - struct rpcrdma_read_chunk *cur_rchunk = NULL; - struct rpcrdma_write_array *warray = NULL; - struct rpcrdma_write_chunk *cur_wchunk = NULL; - __be32 *iptr = headerp->rm_body.rm_chunks; - int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); - - if (type == rpcrdma_readch || type == rpcrdma_areadch) { - /* a read chunk - server will RDMA Read our memory */ - cur_rchunk = (struct rpcrdma_read_chunk *) iptr; - } else { - /* a write or reply chunk - server will RDMA Write our memory */ - *iptr++ = xdr_zero; /* encode a NULL read chunk list */ - if (type == rpcrdma_replych) - *iptr++ = xdr_zero; /* a NULL write chunk list */ - warray = (struct rpcrdma_write_array *) iptr; - cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); - } + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; - if (type == rpcrdma_replych || type == rpcrdma_areadch) - pos = 0; - else - pos = target->head[0].iov_len; + if (wtype != rpcrdma_replych) { + *iptr++ = xdr_zero; /* no Reply chunk present */ + return iptr; + } - nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); if (nsegs < 0) - return nsegs; + return ERR_PTR(nsegs); - map = r_xprt->rx_ia.ri_ops->ro_map; + *iptr++ = xdr_one; /* Reply chunk present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; do { - n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); if (n <= 0) - goto out; - if (cur_rchunk) { /* read */ - cur_rchunk->rc_discrim = xdr_one; - /* all read chunks have the same "position" */ - cur_rchunk->rc_position = cpu_to_be32(pos); - cur_rchunk->rc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_rchunk->rc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: read chunk " - "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, pos, n < nsegs ? "more" : "last"); - cur_rchunk++; - r_xprt->rx_stats.read_chunk_count++; - } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_wchunk->wc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: %s chunk " - "elem %d@0x%llx:0x%x (%s)\n", __func__, - (type == rpcrdma_replych) ? "reply" : "write", - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); - cur_wchunk++; - if (type == rpcrdma_replych) - r_xprt->rx_stats.reply_chunk_count++; - else - r_xprt->rx_stats.write_chunk_count++; - r_xprt->rx_stats.total_rdma_request += seg->mr_len; - } + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: reply segment " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.reply_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); + req->rl_nextseg = seg; - /* success. all failures return above */ - req->rl_nchunks = nchunks; - - /* - * finish off header. If write, marshal discrim and nchunks. - */ - if (cur_rchunk) { - iptr = (__be32 *) cur_rchunk; - *iptr++ = xdr_zero; /* finish the read chunk list */ - *iptr++ = xdr_zero; /* encode a NULL write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } else { - warray->wc_discrim = xdr_one; - warray->wc_nchunks = cpu_to_be32(nchunks); - iptr = (__be32 *) cur_wchunk; - if (type == rpcrdma_writech) { - *iptr++ = xdr_zero; /* finish the write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } - } - - /* - * Return header size. - */ - return (unsigned char *)iptr - (unsigned char *)headerp; + /* Update count of segments in the Reply chunk */ + *segcount = cpu_to_be32(nchunks); -out: - for (pos = 0; nchunks--;) - pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[pos]); - return n; + return iptr; } /* @@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * - * Uses multiple RDMA IOVs for a request: - * [0] -- RPC RDMA header, which uses memory from the *start* of the - * preregistered buffer that already holds the RPC data in - * its middle. - * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. - * [2] -- optional padding. - * [3] -- if padded, header only in [1] and data here. + * Prepares up to two IOVs per Call message: + * + * [0] -- RPC RDMA header + * [1] -- the RPC header/data * * Returns zero on success, otherwise a negative errno. */ @@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - char *base; - size_t rpclen; - ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; + ssize_t hdrlen; + size_t rpclen; + __be32 *iptr; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) return rpcrdma_bc_marshal_reply(rqst); #endif - /* - * rpclen gets amount of data in first buffer, which is the - * pre-registered buffer. - */ - base = rqst->rq_svec[0].iov_base; - rpclen = rqst->rq_svec[0].iov_len; - headerp = rdmab_to_msg(req->rl_rdmabuf); /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; @@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* * Chunks needed for results? * - * o Read ops return data as write chunk(s), header as inline. * o If the expected result is under the inline threshold, all ops * return as inline. + * o Large read ops return data as write chunk(s), header as + * inline. * o Large non-read ops return as a single reply chunk. */ - if (rqst->rq_rcv_buf.flags & XDRBUF_READ) - wtype = rpcrdma_writech; - else if (rpcrdma_results_inline(rqst)) + if (rpcrdma_results_inline(r_xprt, rqst)) wtype = rpcrdma_noch; + else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + wtype = rpcrdma_writech; else wtype = rpcrdma_replych; @@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * that both has a data payload, and whose non-data arguments * by themselves are larger than the inline threshold. */ - if (rpcrdma_args_inline(rqst)) { + if (rpcrdma_args_inline(r_xprt, rqst)) { rtype = rpcrdma_noch; + rpcrdma_inline_pullup(rqst); + rpclen = rqst->rq_svec[0].iov_len; } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; + rpclen = rqst->rq_svec[0].iov_len; + rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); } else { r_xprt->rx_stats.nomsg_call_count++; headerp->rm_type = htonl(RDMA_NOMSG); @@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rpclen = 0; } - /* The following simplification is not true forever */ - if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) - wtype = rpcrdma_noch; - if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { - dprintk("RPC: %s: cannot marshal multiple chunk lists\n", - __func__); - return -EIO; - } - - hdrlen = RPCRDMA_HDRLEN_MIN; - - /* - * Pull up any extra send data into the preregistered buffer. - * When padding is in use and applies to the transfer, insert - * it and change the message type. + /* This implementation supports the following combinations + * of chunk lists in one RPC-over-RDMA Call message: + * + * - Read list + * - Write list + * - Reply chunk + * - Read list + Reply chunk + * + * It might not yet support the following combinations: + * + * - Read list + Write list + * + * It does not support the following combinations: + * + * - Write list + Reply chunk + * - Read list + Write list + Reply chunk + * + * This implementation supports only a single chunk in each + * Read or Write list. Thus for example the client cannot + * send a Call message with a Position Zero Read chunk and a + * regular Read chunk at the same time. */ - if (rtype == rpcrdma_noch) { - - rpcrdma_inline_pullup(rqst); - - headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; - /* new length after pullup */ - rpclen = rqst->rq_svec[0].iov_len; - } else if (rtype == rpcrdma_readch) - rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); - if (rtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, - headerp, rtype); - wtype = rtype; /* simplify dprintk */ - - } else if (wtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, - headerp, wtype); - } - if (hdrlen < 0) - return hdrlen; + req->rl_nchunks = 0; + req->rl_nextseg = req->rl_segments; + iptr = headerp->rm_body.rm_chunks; + iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; + + if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) + goto out_overflow; + + dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", + rqst->rq_task->tk_pid, __func__, + transfertypes[rtype], transfertypes[wtype], + hdrlen, rpclen); - dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" - " headerp 0x%p base 0x%p lkey 0x%x\n", - __func__, transfertypes[wtype], hdrlen, rpclen, - headerp, base, rdmab_lkey(req->rl_rdmabuf)); - - /* - * initialize send_iov's - normally only two: rdma chunk header and - * single preregistered RPC header buffer, but if padding is present, - * then use a preregistered (and zeroed) pad buffer between the RPC - * header and any write data. In all non-rdma cases, any following - * data has been copied into the RPC header buffer. - */ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); @@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) req->rl_niovs = 2; return 0; + +out_overflow: + pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", + hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); + /* Terminate this RPC. Chunks registered above will be + * released by xprt_release -> xprt_rmda_free . + */ + return -EIO; + +out_unmap: + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + return PTR_ERR(iptr); } /* diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index b1b009f10ea3..99d2e5b72726 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; +static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; +static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; static unsigned int zero; static unsigned int max_padding = PAGE_SIZE; static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; @@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_max_inline_write", @@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_inline_write_padding", @@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) out: dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ + req->rl_task = task; return req->rl_sendbuf->rg_base; out_rdmabuf: @@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer) struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; struct rpcrdma_regbuf *rb; - int i; if (buffer == NULL) return; @@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer) dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - for (i = 0; req->rl_nchunks;) { - --req->rl_nchunks; - i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[i]); - } + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, + !RPC_IS_ASYNC(req->rl_task)); rpcrdma_buffer_put(req); } @@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = { #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, .bc_up = xprt_rdma_bc_up, + .bc_maxpayload = xprt_rdma_bc_maxpayload, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index f5ed9f982cd7..b044d98a1370 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -203,15 +203,6 @@ out_fail: goto out_schedule; } -static void -rpcrdma_flush_cqs(struct rpcrdma_ep *ep) -{ - struct ib_wc wc; - - while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_receive_wc(NULL, &wc); -} - static int rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) { @@ -374,23 +365,6 @@ out: } /* - * Drain any cq, prior to teardown. - */ -static void -rpcrdma_clean_cq(struct ib_cq *cq) -{ - struct ib_wc wc; - int count = 0; - - while (1 == ib_poll_cq(cq, 1, &wc)) - ++count; - - if (count) - dprintk("RPC: %s: flushed %d events (last 0x%x)\n", - __func__, count, wc.opcode); -} - -/* * Exported functions. */ @@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) dprintk("RPC: %s: memory registration strategy is '%s'\n", __func__, ia->ri_ops->ro_displayname); - rwlock_init(&ia->ri_qplock); return 0; out3: @@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, __func__); return -ENOMEM; } - max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; + max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; /* check provider's send/recv wr limits */ if (cdata->max_requests > max_qp_wr) @@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.recv_cq = recvcq; /* Initialize cma parameters */ + memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); /* RPC/RDMA does not use private data */ ep->rep_remote_cma.private_data = NULL; @@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_remote_cma.responder_resources = ia->ri_device->attrs.max_qp_rd_atom; - ep->rep_remote_cma.retry_count = 7; + /* Limit transport retries so client can detect server + * GID changes quickly. RPC layer handles re-establishing + * transport connection and retransmission. + */ + ep->rep_remote_cma.retry_count = 6; + + /* RPC-over-RDMA handles its own flow control. In addition, + * make all RNR NAKs visible so we know that RPC-over-RDMA + * flow control is working correctly (no NAKs should be seen). + */ ep->rep_remote_cma.flow_control = 0; ep->rep_remote_cma.rnr_retry_count = 0; @@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) cancel_delayed_work_sync(&ep->rep_connect_worker); - if (ia->ri_id->qp) - rpcrdma_ep_disconnect(ep, ia); - - rpcrdma_clean_cq(ep->rep_attr.recv_cq); - rpcrdma_clean_cq(ep->rep_attr.send_cq); - if (ia->ri_id->qp) { + rpcrdma_ep_disconnect(ep, ia); rdma_destroy_qp(ia->ri_id); ia->ri_id->qp = NULL; } @@ -659,7 +639,6 @@ retry: dprintk("RPC: %s: reconnecting...\n", __func__); rpcrdma_ep_disconnect(ep, ia); - rpcrdma_flush_cqs(ep); xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); id = rpcrdma_create_id(xprt, ia, @@ -692,10 +671,8 @@ retry: goto out; } - write_lock(&ia->ri_qplock); old = ia->ri_id; ia->ri_id = id; - write_unlock(&ia->ri_qplock); rdma_destroy_qp(old); rpcrdma_destroy_id(old); @@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { int rc; - rpcrdma_flush_cqs(ep); rc = rdma_disconnect(ia->ri_id); if (!rc) { /* returns without wait if not connected */ @@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); ep->rep_connected = rc; } + + ib_drain_qp(ia->ri_id->qp); } struct rpcrdma_req * @@ -1271,25 +1249,3 @@ out_rc: rpcrdma_recv_buffer_put(rep); return rc; } - -/* How many chunk list items fit within our inline buffers? - */ -unsigned int -rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - int bytes, segments; - - bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); - bytes -= RPCRDMA_HDRLEN_MIN; - if (bytes < sizeof(struct rpcrdma_segment) * 2) { - pr_warn("RPC: %s: inline threshold too small\n", - __func__); - return 0; - } - - segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); - dprintk("RPC: %s: max chunk list size = %d segments\n", - __func__, segments); - return segments; -} diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 2ebc743cb96f..95cdc66225ee 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -65,7 +65,6 @@ */ struct rpcrdma_ia { const struct rpcrdma_memreg_ops *ri_ops; - rwlock_t ri_qplock; struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; @@ -73,6 +72,8 @@ struct rpcrdma_ia { struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; + unsigned int ri_max_inline_write; + unsigned int ri_max_inline_read; struct ib_qp_attr ri_qp_attr; struct ib_qp_init_attr ri_qp_init_attr; }; @@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) #define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) +/* To ensure a transport can always make forward progress, + * the number of RDMA segments allowed in header chunk lists + * is capped at 8. This prevents less-capable devices and + * memory registrations from overrunning the Send buffer + * while building chunk lists. + * + * Elements of the Read list take up more room than the + * Write list or Reply chunk. 8 read segments means the Read + * list (or Write list or Reply chunk) cannot consume more + * than + * + * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. + * + * And the fixed part of the header is another 24 bytes. + * + * The smallest inline threshold is 1024 bytes, ensuring that + * at least 750 bytes are available for RPC messages. + */ +#define RPCRDMA_MAX_HDR_SEGS (8) + /* * struct rpcrdma_rep -- this structure encapsulates state required to recv * and complete a reply, asychronously. It needs several pieces of @@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) */ #define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) -#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ + +/* data segments + head/tail for Call + head/tail for Reply */ +#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4) struct rpcrdma_buffer; @@ -198,14 +221,13 @@ enum rpcrdma_frmr_state { }; struct rpcrdma_frmr { - struct scatterlist *sg; - int sg_nents; + struct scatterlist *fr_sg; + int fr_nents; + enum dma_data_direction fr_dir; struct ib_mr *fr_mr; struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; struct completion fr_linv_done; - struct work_struct fr_work; - struct rpcrdma_xprt *fr_xprt; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -222,6 +244,8 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; + struct work_struct mw_work; + struct rpcrdma_xprt *mw_xprt; struct list_head mw_list; struct list_head mw_all; }; @@ -270,12 +294,14 @@ struct rpcrdma_req { unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; + struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct rpcrdma_mr_seg *rl_nextseg; struct ib_cqe rl_cqe; struct list_head rl_all; @@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mr_seg *, int, bool); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct rpcrdma_req *); - int (*ro_unmap)(struct rpcrdma_xprt *, - struct rpcrdma_mr_seg *); + void (*ro_unmap_safe)(struct rpcrdma_xprt *, + struct rpcrdma_req *, bool); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); @@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); -unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); @@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *); * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c */ int rpcrdma_marshal_req(struct rpc_rqst *); +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *, + struct rpcrdma_create_data_internal *, + unsigned int); /* RPC/RDMA module init - xprtrdma/transport.c */ @@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int xprt_rdma_bc_up(struct svc_serv *, struct net *); +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index b90c5397b5e1..2d3e0c42361e 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1364,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) return ret; return 0; } + +static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) +{ + return PAGE_SIZE; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -2661,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, .bc_up = xs_tcp_bc_up, + .bc_maxpayload = xs_tcp_bc_maxpayload, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif |