From f34c67989232dff979ef8747f11b40440a7fca97 Mon Sep 17 00:00:00 2001 From: Jeff Becker Date: Thu, 24 Aug 2017 10:06:10 -0700 Subject: [PATCH] NFS/RDMA backport patch to revert source files to 4.6 kernel in order to facilitate dependency on distro SUNRPC. Include fix to use correct ib_map_mr_sg signature from OFED4.8. Signed-off-by: Jeff Becker --- ofed_scripts/checkout_files | 1 + patches/0016-BACKPORT-nfsrdma-to-4.6.patch | 3682 ++++++++++++++++++++ 2 files changed, 3683 insertions(+) create mode 100644 patches/0016-BACKPORT-nfsrdma-to-4.6.patch diff --git a/ofed_scripts/checkout_files b/ofed_scripts/checkout_files index 21bc220..f1b6a76 100644 --- a/ofed_scripts/checkout_files +++ b/ofed_scripts/checkout_files @@ -25,3 +25,4 @@ drivers/nvme/ include/linux/nvme.h include/linux/nvme-rdma.h include/uapi/linux/nvme_ioctl.h +include/linux/sunrpc/ diff --git a/patches/0016-BACKPORT-nfsrdma-to-4.6.patch b/patches/0016-BACKPORT-nfsrdma-to-4.6.patch new file mode 100644 index 0000000..e0846b2 --- /dev/null +++ b/patches/0016-BACKPORT-nfsrdma-to-4.6.patch @@ -0,0 +1,3682 @@ +diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile +index ef19fa4..dc9f3b5 100644 +--- a/net/sunrpc/xprtrdma/Makefile ++++ b/net/sunrpc/xprtrdma/Makefile +@@ -1,7 +1,7 @@ + obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o + + rpcrdma-y := transport.o rpc_rdma.o verbs.o \ +- fmr_ops.o frwr_ops.o \ ++ fmr_ops.o frwr_ops.o physical_ops.o \ + svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ + svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ + module.o +diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c +index 87762d9..2dcd764 100644 +--- a/net/sunrpc/xprtrdma/backchannel.c ++++ b/net/sunrpc/xprtrdma/backchannel.c +@@ -192,22 +192,6 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) + } + + /** +- * xprt_rdma_bc_maxpayload - Return maximum backchannel message size +- * @xprt: transport +- * +- * Returns maximum size, in bytes, of a backchannel message +- */ +-size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) +-{ +- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); +- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; +- size_t maxmsg; +- +- maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); +- return maxmsg - RPCRDMA_HDRLEN_MIN; +-} +- +-/** + * rpcrdma_bc_marshal_reply - Send backwards direction reply + * @rqst: buffer containing RPC reply data + * +diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c +index 21cb3b1..b289e10 100644 +--- a/net/sunrpc/xprtrdma/fmr_ops.c ++++ b/net/sunrpc/xprtrdma/fmr_ops.c +@@ -19,6 +19,13 @@ + * verb (fmr_op_unmap). + */ + ++/* Transport recovery ++ * ++ * After a transport reconnect, fmr_op_map re-uses the MR already ++ * allocated for the RPC, but generates a fresh rkey then maps the ++ * MR again. This process is synchronous. ++ */ ++ + #include "xprt_rdma.h" + + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) +@@ -28,151 +35,82 @@ + /* Maximum scatter/gather per FMR */ + #define RPCRDMA_MAX_FMR_SGES (64) + +-/* Access mode of externally registered pages */ +-enum { +- RPCRDMA_FMR_ACCESS_FLAGS = IB_ACCESS_REMOTE_WRITE | +- IB_ACCESS_REMOTE_READ, +-}; ++static int ++fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, ++ struct rpcrdma_create_data_internal *cdata) ++{ ++ return 0; ++} + +-bool +-fmr_is_supported(struct rpcrdma_ia *ia) ++/* FMR mode conveys up to 64 pages of payload per chunk segment. ++ */ ++static size_t ++fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) + { +- if (!ia->ri_device->alloc_fmr) { +- pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n", +- ia->ri_device->name); +- return false; +- } +- return true; ++ return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, ++ rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES); + } + + static int +-fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw) ++fmr_op_init(struct rpcrdma_xprt *r_xprt) + { +- static struct ib_fmr_attr fmr_attr = { ++ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; ++ int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ; ++ struct ib_fmr_attr fmr_attr = { + .max_pages = RPCRDMA_MAX_FMR_SGES, + .max_maps = 1, + .page_shift = PAGE_SHIFT + }; +- +- mw->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES, +- sizeof(u64), GFP_KERNEL); +- if (!mw->fmr.fm_physaddrs) +- goto out_free; +- +- mw->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES, +- sizeof(*mw->mw_sg), GFP_KERNEL); +- if (!mw->mw_sg) +- goto out_free; +- +- sg_init_table(mw->mw_sg, RPCRDMA_MAX_FMR_SGES); +- +- mw->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS, +- &fmr_attr); +- if (IS_ERR(mw->fmr.fm_mr)) +- goto out_fmr_err; +- ++ struct ib_pd *pd = r_xprt->rx_ia.ri_pd; ++ struct rpcrdma_mw *r; ++ int i, rc; ++ ++ spin_lock_init(&buf->rb_mwlock); ++ INIT_LIST_HEAD(&buf->rb_mws); ++ INIT_LIST_HEAD(&buf->rb_all); ++ ++ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1); ++ i += 2; /* head + tail */ ++ i *= buf->rb_max_requests; /* one set for each RPC slot */ ++ dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i); ++ ++ rc = -ENOMEM; ++ while (i--) { ++ r = kzalloc(sizeof(*r), GFP_KERNEL); ++ if (!r) ++ goto out; ++ ++ r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * ++ sizeof(u64), GFP_KERNEL); ++ if (!r->fmr.physaddrs) ++ goto out_free; ++ ++ r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); ++ if (IS_ERR(r->fmr.fmr)) ++ goto out_fmr_err; ++ ++ list_add(&r->mw_list, &buf->rb_mws); ++ list_add(&r->mw_all, &buf->rb_all); ++ } + return 0; + + out_fmr_err: +- dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__, +- PTR_ERR(mw->fmr.fm_mr)); +- ++ rc = PTR_ERR(r->fmr.fmr); ++ dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); ++ kfree(r->fmr.physaddrs); + out_free: +- kfree(mw->mw_sg); +- kfree(mw->fmr.fm_physaddrs); +- return -ENOMEM; +-} +- +-static int +-__fmr_unmap(struct rpcrdma_mw *mw) +-{ +- LIST_HEAD(l); +- int rc; +- +- list_add(&mw->fmr.fm_mr->list, &l); +- rc = ib_unmap_fmr(&l); +- list_del_init(&mw->fmr.fm_mr->list); +- return rc; +-} +- +-static void +-fmr_op_release_mr(struct rpcrdma_mw *r) +-{ +- LIST_HEAD(unmap_list); +- int rc; +- +- /* Ensure MW is not on any rl_registered list */ +- if (!list_empty(&r->mw_list)) +- list_del(&r->mw_list); +- +- kfree(r->fmr.fm_physaddrs); +- kfree(r->mw_sg); +- +- /* In case this one was left mapped, try to unmap it +- * to prevent dealloc_fmr from failing with EBUSY +- */ +- rc = __fmr_unmap(r); +- if (rc) +- pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n", +- r, rc); +- +- rc = ib_dealloc_fmr(r->fmr.fm_mr); +- if (rc) +- pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n", +- r, rc); +- + kfree(r); +-} +- +-/* Reset of a single FMR. +- */ +-static void +-fmr_op_recover_mr(struct rpcrdma_mw *mw) +-{ +- struct rpcrdma_xprt *r_xprt = mw->mw_xprt; +- int rc; +- +- /* ORDER: invalidate first */ +- rc = __fmr_unmap(mw); +- +- /* ORDER: then DMA unmap */ +- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, +- mw->mw_sg, mw->mw_nents, mw->mw_dir); +- if (rc) +- goto out_release; +- +- rpcrdma_put_mw(r_xprt, mw); +- r_xprt->rx_stats.mrs_recovered++; +- return; +- +-out_release: +- pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mw); +- r_xprt->rx_stats.mrs_orphaned++; +- +- spin_lock(&r_xprt->rx_buf.rb_mwlock); +- list_del(&mw->mw_all); +- spin_unlock(&r_xprt->rx_buf.rb_mwlock); +- +- fmr_op_release_mr(mw); ++out: ++ return rc; + } + + static int +-fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, +- struct rpcrdma_create_data_internal *cdata) ++__fmr_unmap(struct rpcrdma_mw *r) + { +- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, +- RPCRDMA_MAX_DATA_SEGS / +- RPCRDMA_MAX_FMR_SGES)); +- return 0; +-} ++ LIST_HEAD(l); + +-/* FMR mode conveys up to 64 pages of payload per chunk segment. +- */ +-static size_t +-fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) +-{ +- return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, +- RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); ++ list_add(&r->fmr.fmr->list, &l); ++ return ib_unmap_fmr(&l); + } + + /* Use the ib_map_phys_fmr() verb to register a memory region +@@ -180,16 +118,27 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) + */ + static int + fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, +- int nsegs, bool writing, struct rpcrdma_mw **out) ++ int nsegs, bool writing) + { ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ struct ib_device *device = ia->ri_device; ++ enum dma_data_direction direction = rpcrdma_data_dir(writing); + struct rpcrdma_mr_seg *seg1 = seg; + int len, pageoff, i, rc; + struct rpcrdma_mw *mw; +- u64 *dma_pages; + +- mw = rpcrdma_get_mw(r_xprt); +- if (!mw) +- return -ENOBUFS; ++ mw = seg1->rl_mw; ++ seg1->rl_mw = NULL; ++ if (!mw) { ++ mw = rpcrdma_get_mw(r_xprt); ++ if (!mw) ++ return -ENOMEM; ++ } else { ++ /* this is a retransmit; generate a fresh rkey */ ++ rc = __fmr_unmap(mw); ++ if (rc) ++ return rc; ++ } + + pageoff = offset_in_page(seg1->mr_offset); + seg1->mr_offset -= pageoff; /* start of page */ +@@ -198,14 +147,8 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + if (nsegs > RPCRDMA_MAX_FMR_SGES) + nsegs = RPCRDMA_MAX_FMR_SGES; + for (i = 0; i < nsegs;) { +- if (seg->mr_page) +- sg_set_page(&mw->mw_sg[i], +- seg->mr_page, +- seg->mr_len, +- offset_in_page(seg->mr_offset)); +- else +- sg_set_buf(&mw->mw_sg[i], seg->mr_offset, +- seg->mr_len); ++ rpcrdma_map_one(device, seg, direction); ++ mw->fmr.physaddrs[i] = seg->mr_dma; + len += seg->mr_len; + ++seg; + ++i; +@@ -214,54 +157,54 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) + break; + } +- mw->mw_nents = i; +- mw->mw_dir = rpcrdma_data_dir(writing); +- if (i == 0) +- goto out_dmamap_err; +- +- if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device, +- mw->mw_sg, mw->mw_nents, mw->mw_dir)) +- goto out_dmamap_err; +- +- for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++) +- dma_pages[i] = sg_dma_address(&mw->mw_sg[i]); +- rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents, +- dma_pages[0]); ++ ++ rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, ++ i, seg1->mr_dma); + if (rc) + goto out_maperr; + +- mw->mw_handle = mw->fmr.fm_mr->rkey; +- mw->mw_length = len; +- mw->mw_offset = dma_pages[0] + pageoff; ++ seg1->rl_mw = mw; ++ seg1->mr_rkey = mw->fmr.fmr->rkey; ++ seg1->mr_base = seg1->mr_dma + pageoff; ++ seg1->mr_nsegs = i; ++ seg1->mr_len = len; ++ return i; + +- *out = mw; +- return mw->mw_nents; ++out_maperr: ++ dprintk("RPC: %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", ++ __func__, len, (unsigned long long)seg1->mr_dma, ++ pageoff, i, rc); ++ while (i--) ++ rpcrdma_unmap_one(device, --seg); ++ return rc; ++} + +-out_dmamap_err: +- pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", +- mw->mw_sg, mw->mw_nents); +- rpcrdma_defer_mr_recovery(mw); +- return -EIO; ++static void ++__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) ++{ ++ struct ib_device *device = r_xprt->rx_ia.ri_device; ++ struct rpcrdma_mw *mw = seg->rl_mw; ++ int nsegs = seg->mr_nsegs; + +-out_maperr: +- pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", +- len, (unsigned long long)dma_pages[0], +- pageoff, mw->mw_nents, rc); +- rpcrdma_defer_mr_recovery(mw); +- return -EIO; ++ seg->rl_mw = NULL; ++ ++ while (nsegs--) ++ rpcrdma_unmap_one(device, seg++); ++ ++ rpcrdma_put_mw(r_xprt, mw); + } + + /* Invalidate all memory regions that were registered for "req". + * + * Sleeps until it is safe for the host CPU to access the + * previously mapped memory regions. +- * +- * Caller ensures that req->rl_registered is not empty. + */ + static void + fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + { +- struct rpcrdma_mw *mw, *tmp; ++ struct rpcrdma_mr_seg *seg; ++ unsigned int i, nchunks; ++ struct rpcrdma_mw *mw; + LIST_HEAD(unmap_list); + int rc; + +@@ -270,65 +213,92 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + /* ORDER: Invalidate all of the req's MRs first + * + * ib_unmap_fmr() is slow, so use a single call instead +- * of one call per mapped FMR. ++ * of one call per mapped MR. + */ +- list_for_each_entry(mw, &req->rl_registered, mw_list) +- list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); ++ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { ++ seg = &req->rl_segments[i]; ++ mw = seg->rl_mw; ++ ++ list_add(&mw->fmr.fmr->list, &unmap_list); ++ ++ i += seg->mr_nsegs; ++ } + rc = ib_unmap_fmr(&unmap_list); + if (rc) +- goto out_reset; ++ pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc); + + /* ORDER: Now DMA unmap all of the req's MRs, and return + * them to the free MW list. + */ +- list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { +- list_del_init(&mw->mw_list); +- list_del_init(&mw->fmr.fm_mr->list); +- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, +- mw->mw_sg, mw->mw_nents, mw->mw_dir); +- rpcrdma_put_mw(r_xprt, mw); +- } +- +- return; ++ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { ++ seg = &req->rl_segments[i]; + +-out_reset: +- pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc); ++ __fmr_dma_unmap(r_xprt, seg); + +- list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { +- list_del_init(&mw->fmr.fm_mr->list); +- fmr_op_recover_mr(mw); ++ i += seg->mr_nsegs; ++ seg->mr_nsegs = 0; + } ++ ++ req->rl_nchunks = 0; + } + +-/* Use a slow, safe mechanism to invalidate all memory regions +- * that were registered for "req". ++/* Use the ib_unmap_fmr() verb to prevent further remote ++ * access via RDMA READ or RDMA WRITE. + */ ++static int ++fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) ++{ ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ struct rpcrdma_mr_seg *seg1 = seg; ++ struct rpcrdma_mw *mw = seg1->rl_mw; ++ int rc, nsegs = seg->mr_nsegs; ++ ++ dprintk("RPC: %s: FMR %p\n", __func__, mw); ++ ++ seg1->rl_mw = NULL; ++ while (seg1->mr_nsegs--) ++ rpcrdma_unmap_one(ia->ri_device, seg++); ++ rc = __fmr_unmap(mw); ++ if (rc) ++ goto out_err; ++ rpcrdma_put_mw(r_xprt, mw); ++ return nsegs; ++ ++out_err: ++ /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy ++ * will attempt to release it when the transport is destroyed. ++ */ ++ dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc); ++ return nsegs; ++} ++ + static void +-fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, +- bool sync) ++fmr_op_destroy(struct rpcrdma_buffer *buf) + { +- struct rpcrdma_mw *mw; ++ struct rpcrdma_mw *r; ++ int rc; ++ ++ while (!list_empty(&buf->rb_all)) { ++ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); ++ list_del(&r->mw_all); ++ kfree(r->fmr.physaddrs); + +- while (!list_empty(&req->rl_registered)) { +- mw = list_first_entry(&req->rl_registered, +- struct rpcrdma_mw, mw_list); +- list_del_init(&mw->mw_list); ++ rc = ib_dealloc_fmr(r->fmr.fmr); ++ if (rc) ++ dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", ++ __func__, rc); + +- if (sync) +- fmr_op_recover_mr(mw); +- else +- rpcrdma_defer_mr_recovery(mw); ++ kfree(r); + } + } + + const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { + .ro_map = fmr_op_map, + .ro_unmap_sync = fmr_op_unmap_sync, +- .ro_unmap_safe = fmr_op_unmap_safe, +- .ro_recover_mr = fmr_op_recover_mr, ++ .ro_unmap = fmr_op_unmap, + .ro_open = fmr_op_open, + .ro_maxpages = fmr_op_maxpages, +- .ro_init_mr = fmr_op_init_mr, +- .ro_release_mr = fmr_op_release_mr, ++ .ro_init = fmr_op_init, ++ .ro_destroy = fmr_op_destroy, + .ro_displayname = "fmr", + }; +diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c +index 892b5e1..c250924 100644 +--- a/net/sunrpc/xprtrdma/frwr_ops.c ++++ b/net/sunrpc/xprtrdma/frwr_ops.c +@@ -73,40 +73,93 @@ + # define RPCDBG_FACILITY RPCDBG_TRANS + #endif + +-bool +-frwr_is_supported(struct rpcrdma_ia *ia) ++static struct workqueue_struct *frwr_recovery_wq; ++ ++#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM) ++ ++int ++frwr_alloc_recovery_wq(void) ++{ ++ frwr_recovery_wq = alloc_workqueue("frwr_recovery", ++ FRWR_RECOVERY_WQ_FLAGS, 0); ++ return !frwr_recovery_wq ? -ENOMEM : 0; ++} ++ ++void ++frwr_destroy_recovery_wq(void) ++{ ++ struct workqueue_struct *wq; ++ ++ if (!frwr_recovery_wq) ++ return; ++ ++ wq = frwr_recovery_wq; ++ frwr_recovery_wq = NULL; ++ destroy_workqueue(wq); ++} ++ ++/* Deferred reset of a single FRMR. Generate a fresh rkey by ++ * replacing the MR. ++ * ++ * There's no recovery if this fails. The FRMR is abandoned, but ++ * remains in rb_all. It will be cleaned up when the transport is ++ * destroyed. ++ */ ++static void ++__frwr_recovery_worker(struct work_struct *work) + { +- struct ib_device_attr *attrs = &ia->ri_device->attrs; +- +- if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) +- goto out_not_supported; +- if (attrs->max_fast_reg_page_list_len == 0) +- goto out_not_supported; +- return true; +- +-out_not_supported: +- pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", +- ia->ri_device->name); +- return false; ++ struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, ++ frmr.fr_work); ++ struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; ++ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; ++ struct ib_pd *pd = r_xprt->rx_ia.ri_pd; ++ ++ if (ib_dereg_mr(r->frmr.fr_mr)) ++ goto out_fail; ++ ++ r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); ++ if (IS_ERR(r->frmr.fr_mr)) ++ goto out_fail; ++ ++ dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); ++ r->frmr.fr_state = FRMR_IS_INVALID; ++ rpcrdma_put_mw(r_xprt, r); ++ return; ++ ++out_fail: ++ pr_warn("RPC: %s: FRMR %p unrecovered\n", ++ __func__, r); ++} ++ ++/* A broken MR was discovered in a context that can't sleep. ++ * Defer recovery to the recovery worker. ++ */ ++static void ++__frwr_queue_recovery(struct rpcrdma_mw *r) ++{ ++ INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); ++ queue_work(frwr_recovery_wq, &r->frmr.fr_work); + } + + static int +-frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) ++__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, ++ unsigned int depth) + { +- unsigned int depth = ia->ri_max_frmr_depth; + struct rpcrdma_frmr *f = &r->frmr; + int rc; + +- f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, depth); ++ f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(f->fr_mr)) + goto out_mr_err; + +- r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL); +- if (!r->mw_sg) ++ f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); ++ if (!f->sg) + goto out_list_err; + +- sg_init_table(r->mw_sg, depth); ++ sg_init_table(f->sg, depth); ++ + init_completion(&f->fr_linv_done); ++ + return 0; + + out_mr_err: +@@ -124,79 +177,15 @@ out_list_err: + } + + static void +-frwr_op_release_mr(struct rpcrdma_mw *r) ++__frwr_release(struct rpcrdma_mw *r) + { + int rc; + +- /* Ensure MW is not on any rl_registered list */ +- if (!list_empty(&r->mw_list)) +- list_del(&r->mw_list); +- + rc = ib_dereg_mr(r->frmr.fr_mr); + if (rc) +- pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n", +- r, rc); +- kfree(r->mw_sg); +- kfree(r); +-} +- +-static int +-__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +-{ +- struct rpcrdma_frmr *f = &r->frmr; +- int rc; +- +- rc = ib_dereg_mr(f->fr_mr); +- if (rc) { +- pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", +- rc, r); +- return rc; +- } +- +- f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, +- ia->ri_max_frmr_depth); +- if (IS_ERR(f->fr_mr)) { +- pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", +- PTR_ERR(f->fr_mr), r); +- return PTR_ERR(f->fr_mr); +- } +- +- dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); +- f->fr_state = FRMR_IS_INVALID; +- return 0; +-} +- +-/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR. +- * +- * There's no recovery if this fails. The FRMR is abandoned, but +- * remains in rb_all. It will be cleaned up when the transport is +- * destroyed. +- */ +-static void +-frwr_op_recover_mr(struct rpcrdma_mw *mw) +-{ +- struct rpcrdma_xprt *r_xprt = mw->mw_xprt; +- struct rpcrdma_ia *ia = &r_xprt->rx_ia; +- int rc; +- +- rc = __frwr_reset_mr(ia, mw); +- ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); +- if (rc) +- goto out_release; +- +- rpcrdma_put_mw(r_xprt, mw); +- r_xprt->rx_stats.mrs_recovered++; +- return; +- +-out_release: +- pr_err("rpcrdma: FRMR reset failed %d, %p release\n", rc, mw); +- r_xprt->rx_stats.mrs_orphaned++; +- +- spin_lock(&r_xprt->rx_buf.rb_mwlock); +- list_del(&mw->mw_all); +- spin_unlock(&r_xprt->rx_buf.rb_mwlock); +- +- frwr_op_release_mr(mw); ++ dprintk("RPC: %s: ib_dereg_mr status %i\n", ++ __func__, rc); ++ kfree(r->frmr.sg); + } + + static int +@@ -242,9 +231,6 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, + depth; + } + +- rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, +- RPCRDMA_MAX_DATA_SEGS / +- ia->ri_max_frmr_depth)); + return 0; + } + +@@ -257,7 +243,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + + return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, +- RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth); ++ rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); + } + + static void +@@ -332,14 +318,57 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) + complete_all(&frmr->fr_linv_done); + } + +-/* Post a REG_MR Work Request to register a memory region ++static int ++frwr_op_init(struct rpcrdma_xprt *r_xprt) ++{ ++ struct rpcrdma_buffer *buf = &r_xprt->rx_buf; ++ struct ib_device *device = r_xprt->rx_ia.ri_device; ++ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; ++ struct ib_pd *pd = r_xprt->rx_ia.ri_pd; ++ int i; ++ ++ spin_lock_init(&buf->rb_mwlock); ++ INIT_LIST_HEAD(&buf->rb_mws); ++ INIT_LIST_HEAD(&buf->rb_all); ++ ++ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1); ++ i += 2; /* head + tail */ ++ i *= buf->rb_max_requests; /* one set for each RPC slot */ ++ dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i); ++ ++ while (i--) { ++ struct rpcrdma_mw *r; ++ int rc; ++ ++ r = kzalloc(sizeof(*r), GFP_KERNEL); ++ if (!r) ++ return -ENOMEM; ++ ++ rc = __frwr_init(r, pd, device, depth); ++ if (rc) { ++ kfree(r); ++ return rc; ++ } ++ ++ list_add(&r->mw_list, &buf->rb_mws); ++ list_add(&r->mw_all, &buf->rb_all); ++ r->frmr.fr_xprt = r_xprt; ++ } ++ ++ return 0; ++} ++ ++/* Post a FAST_REG Work Request to register a memory region + * for remote access via RDMA READ or RDMA WRITE. + */ + static int + frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, +- int nsegs, bool writing, struct rpcrdma_mw **out) ++ int nsegs, bool writing) + { + struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ struct ib_device *device = ia->ri_device; ++ enum dma_data_direction direction = rpcrdma_data_dir(writing); ++ struct rpcrdma_mr_seg *seg1 = seg; + struct rpcrdma_mw *mw; + struct rpcrdma_frmr *frmr; + struct ib_mr *mr; +@@ -348,13 +377,14 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + int rc, i, n, dma_nents; + u8 key; + +- mw = NULL; ++ mw = seg1->rl_mw; ++ seg1->rl_mw = NULL; + do { + if (mw) +- rpcrdma_defer_mr_recovery(mw); ++ __frwr_queue_recovery(mw); + mw = rpcrdma_get_mw(r_xprt); + if (!mw) +- return -ENOBUFS; ++ return -ENOMEM; + } while (mw->frmr.fr_state != FRMR_IS_INVALID); + frmr = &mw->frmr; + frmr->fr_state = FRMR_IS_VALID; +@@ -363,14 +393,15 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + + if (nsegs > ia->ri_max_frmr_depth) + nsegs = ia->ri_max_frmr_depth; ++ + for (i = 0; i < nsegs;) { + if (seg->mr_page) +- sg_set_page(&mw->mw_sg[i], ++ sg_set_page(&frmr->sg[i], + seg->mr_page, + seg->mr_len, + offset_in_page(seg->mr_offset)); + else +- sg_set_buf(&mw->mw_sg[i], seg->mr_offset, ++ sg_set_buf(&frmr->sg[i], seg->mr_offset, + seg->mr_len); + + ++seg; +@@ -381,22 +412,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) + break; + } +- mw->mw_nents = i; +- mw->mw_dir = rpcrdma_data_dir(writing); +- if (i == 0) +- goto out_dmamap_err; ++ frmr->sg_nents = i; + +- dma_nents = ib_dma_map_sg(ia->ri_device, +- mw->mw_sg, mw->mw_nents, mw->mw_dir); +- if (!dma_nents) +- goto out_dmamap_err; ++ dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); ++ if (!dma_nents) { ++ pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", ++ __func__, frmr->sg, frmr->sg_nents); ++ return -ENOMEM; ++ } + +- n = ib_map_mr_sg(mr, mw->mw_sg, mw->mw_nents, NULL, PAGE_SIZE); +- if (unlikely(n != mw->mw_nents)) +- goto out_mapmr_err; ++ n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); ++ if (unlikely(n != frmr->sg_nents)) { ++ pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", ++ __func__, frmr->fr_mr, n, frmr->sg_nents); ++ rc = n < 0 ? n : -EINVAL; ++ goto out_senderr; ++ } + + dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", +- __func__, mw, mw->mw_nents, mr->length); ++ __func__, mw, frmr->sg_nents, mr->length); + + key = (u8)(mr->rkey & 0x000000FF); + ib_update_fast_reg_key(mr, ++key); +@@ -418,34 +452,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + if (rc) + goto out_senderr; + +- mw->mw_handle = mr->rkey; +- mw->mw_length = mr->length; +- mw->mw_offset = mr->iova; +- +- *out = mw; +- return mw->mw_nents; +- +-out_dmamap_err: +- pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", +- mw->mw_sg, mw->mw_nents); +- rpcrdma_defer_mr_recovery(mw); +- return -EIO; ++ seg1->mr_dir = direction; ++ seg1->rl_mw = mw; ++ seg1->mr_rkey = mr->rkey; ++ seg1->mr_base = mr->iova; ++ seg1->mr_nsegs = frmr->sg_nents; ++ seg1->mr_len = mr->length; + +-out_mapmr_err: +- pr_err("rpcrdma: failed to map mr %p (%u/%u)\n", +- frmr->fr_mr, n, mw->mw_nents); +- rpcrdma_defer_mr_recovery(mw); +- return -EIO; ++ return frmr->sg_nents; + + out_senderr: +- pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); +- rpcrdma_defer_mr_recovery(mw); +- return -ENOTCONN; ++ dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); ++ ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction); ++ __frwr_queue_recovery(mw); ++ return rc; + } + + static struct ib_send_wr * +-__frwr_prepare_linv_wr(struct rpcrdma_mw *mw) ++__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) + { ++ struct rpcrdma_mw *mw = seg->rl_mw; + struct rpcrdma_frmr *f = &mw->frmr; + struct ib_send_wr *invalidate_wr; + +@@ -461,19 +487,36 @@ __frwr_prepare_linv_wr(struct rpcrdma_mw *mw) + return invalidate_wr; + } + ++static void ++__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, ++ int rc) ++{ ++ struct ib_device *device = r_xprt->rx_ia.ri_device; ++ struct rpcrdma_mw *mw = seg->rl_mw; ++ struct rpcrdma_frmr *f = &mw->frmr; ++ ++ seg->rl_mw = NULL; ++ ++ ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir); ++ ++ if (!rc) ++ rpcrdma_put_mw(r_xprt, mw); ++ else ++ __frwr_queue_recovery(mw); ++} ++ + /* Invalidate all memory regions that were registered for "req". + * + * Sleeps until it is safe for the host CPU to access the + * previously mapped memory regions. +- * +- * Caller ensures that req->rl_registered is not empty. + */ + static void + frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + { + struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; +- struct rpcrdma_mw *mw, *tmp; ++ struct rpcrdma_mr_seg *seg; ++ unsigned int i, nchunks; + struct rpcrdma_frmr *f; + int rc; + +@@ -484,18 +527,22 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + * Chain the LOCAL_INV Work Requests and post them with + * a single ib_post_send() call. + */ +- f = NULL; + invalidate_wrs = pos = prev = NULL; +- list_for_each_entry(mw, &req->rl_registered, mw_list) { +- pos = __frwr_prepare_linv_wr(mw); ++ seg = NULL; ++ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { ++ seg = &req->rl_segments[i]; ++ ++ pos = __frwr_prepare_linv_wr(seg); + + if (!invalidate_wrs) + invalidate_wrs = pos; + else + prev->next = pos; + prev = pos; +- f = &mw->frmr; ++ ++ i += seg->mr_nsegs; + } ++ f = &seg->rl_mw->frmr; + + /* Strong send queue ordering guarantees that when the + * last WR in the chain completes, all WRs in the chain +@@ -511,8 +558,11 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + * unless ri_id->qp is a valid pointer. + */ + rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); +- if (rc) +- goto reset_mrs; ++ if (rc) { ++ pr_warn("%s: ib_post_send failed %i\n", __func__, rc); ++ rdma_disconnect(ia->ri_id); ++ goto unmap; ++ } + + wait_for_completion(&f->fr_linv_done); + +@@ -520,60 +570,83 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) + * them to the free MW list. + */ + unmap: +- list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { +- list_del_init(&mw->mw_list); +- ib_dma_unmap_sg(ia->ri_device, +- mw->mw_sg, mw->mw_nents, mw->mw_dir); +- rpcrdma_put_mw(r_xprt, mw); +- } +- return; ++ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { ++ seg = &req->rl_segments[i]; + +-reset_mrs: +- pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc); +- rdma_disconnect(ia->ri_id); ++ __frwr_dma_unmap(r_xprt, seg, rc); + +- /* Find and reset the MRs in the LOCAL_INV WRs that did not +- * get posted. This is synchronous, and slow. +- */ +- list_for_each_entry(mw, &req->rl_registered, mw_list) { +- f = &mw->frmr; +- if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { +- __frwr_reset_mr(ia, mw); +- bad_wr = bad_wr->next; +- } ++ i += seg->mr_nsegs; ++ seg->mr_nsegs = 0; + } +- goto unmap; ++ ++ req->rl_nchunks = 0; + } + +-/* Use a slow, safe mechanism to invalidate all memory regions +- * that were registered for "req". ++/* Post a LOCAL_INV Work Request to prevent further remote access ++ * via RDMA READ or RDMA WRITE. + */ ++static int ++frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) ++{ ++ struct rpcrdma_mr_seg *seg1 = seg; ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ struct rpcrdma_mw *mw = seg1->rl_mw; ++ struct rpcrdma_frmr *frmr = &mw->frmr; ++ struct ib_send_wr *invalidate_wr, *bad_wr; ++ int rc, nsegs = seg->mr_nsegs; ++ ++ dprintk("RPC: %s: FRMR %p\n", __func__, mw); ++ ++ seg1->rl_mw = NULL; ++ frmr->fr_state = FRMR_IS_INVALID; ++ invalidate_wr = &mw->frmr.fr_invwr; ++ ++ memset(invalidate_wr, 0, sizeof(*invalidate_wr)); ++ frmr->fr_cqe.done = frwr_wc_localinv; ++ invalidate_wr->wr_cqe = &frmr->fr_cqe; ++ invalidate_wr->opcode = IB_WR_LOCAL_INV; ++ invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; ++ DECR_CQCOUNT(&r_xprt->rx_ep); ++ ++ ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); ++ read_lock(&ia->ri_qplock); ++ rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); ++ read_unlock(&ia->ri_qplock); ++ if (rc) ++ goto out_err; ++ ++ rpcrdma_put_mw(r_xprt, mw); ++ return nsegs; ++ ++out_err: ++ dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); ++ __frwr_queue_recovery(mw); ++ return nsegs; ++} ++ + static void +-frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, +- bool sync) ++frwr_op_destroy(struct rpcrdma_buffer *buf) + { +- struct rpcrdma_mw *mw; ++ struct rpcrdma_mw *r; + +- while (!list_empty(&req->rl_registered)) { +- mw = list_first_entry(&req->rl_registered, +- struct rpcrdma_mw, mw_list); +- list_del_init(&mw->mw_list); ++ /* Ensure stale MWs for "buf" are no longer in flight */ ++ flush_workqueue(frwr_recovery_wq); + +- if (sync) +- frwr_op_recover_mr(mw); +- else +- rpcrdma_defer_mr_recovery(mw); ++ while (!list_empty(&buf->rb_all)) { ++ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); ++ list_del(&r->mw_all); ++ __frwr_release(r); ++ kfree(r); + } + } + + const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { + .ro_map = frwr_op_map, + .ro_unmap_sync = frwr_op_unmap_sync, +- .ro_unmap_safe = frwr_op_unmap_safe, +- .ro_recover_mr = frwr_op_recover_mr, ++ .ro_unmap = frwr_op_unmap, + .ro_open = frwr_op_open, + .ro_maxpages = frwr_op_maxpages, +- .ro_init_mr = frwr_op_init_mr, +- .ro_release_mr = frwr_op_release_mr, ++ .ro_init = frwr_op_init, ++ .ro_destroy = frwr_op_destroy, + .ro_displayname = "frwr", + }; +diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c +new file mode 100644 +index 0000000..481b9b6 +--- /dev/null ++++ b/net/sunrpc/xprtrdma/physical_ops.c +@@ -0,0 +1,111 @@ ++/* ++ * Copyright (c) 2015 Oracle. All rights reserved. ++ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. ++ */ ++ ++/* No-op chunk preparation. All client memory is pre-registered. ++ * Sometimes referred to as ALLPHYSICAL mode. ++ * ++ * Physical registration is simple because all client memory is ++ * pre-registered and never deregistered. This mode is good for ++ * adapter bring up, but is considered not safe: the server is ++ * trusted not to abuse its access to client memory not involved ++ * in RDMA I/O. ++ */ ++ ++#include "xprt_rdma.h" ++ ++#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) ++# define RPCDBG_FACILITY RPCDBG_TRANS ++#endif ++ ++static int ++physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, ++ struct rpcrdma_create_data_internal *cdata) ++{ ++ struct ib_mr *mr; ++ ++ /* Obtain an rkey to use for RPC data payloads. ++ */ ++ mr = ib_get_dma_mr(ia->ri_pd, ++ IB_ACCESS_LOCAL_WRITE | ++ IB_ACCESS_REMOTE_WRITE | ++ IB_ACCESS_REMOTE_READ); ++ if (IS_ERR(mr)) { ++ pr_err("%s: ib_get_dma_mr for failed with %lX\n", ++ __func__, PTR_ERR(mr)); ++ return -ENOMEM; ++ } ++ ++ ia->ri_dma_mr = mr; ++ return 0; ++} ++ ++/* PHYSICAL memory registration conveys one page per chunk segment. ++ */ ++static size_t ++physical_op_maxpages(struct rpcrdma_xprt *r_xprt) ++{ ++ return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, ++ rpcrdma_max_segments(r_xprt)); ++} ++ ++static int ++physical_op_init(struct rpcrdma_xprt *r_xprt) ++{ ++ return 0; ++} ++ ++/* The client's physical memory is already exposed for ++ * remote access via RDMA READ or RDMA WRITE. ++ */ ++static int ++physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, ++ int nsegs, bool writing) ++{ ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ ++ rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing)); ++ seg->mr_rkey = ia->ri_dma_mr->rkey; ++ seg->mr_base = seg->mr_dma; ++ return 1; ++} ++ ++/* Unmap a memory region, but leave it registered. ++ */ ++static int ++physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) ++{ ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ ++ rpcrdma_unmap_one(ia->ri_device, seg); ++ return 1; ++} ++ ++/* DMA unmap all memory regions that were mapped for "req". ++ */ ++static void ++physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ++{ ++ struct ib_device *device = r_xprt->rx_ia.ri_device; ++ unsigned int i; ++ ++ for (i = 0; req->rl_nchunks; --req->rl_nchunks) ++ rpcrdma_unmap_one(device, &req->rl_segments[i++]); ++} ++ ++static void ++physical_op_destroy(struct rpcrdma_buffer *buf) ++{ ++} ++ ++const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { ++ .ro_map = physical_op_map, ++ .ro_unmap_sync = physical_op_unmap_sync, ++ .ro_unmap = physical_op_unmap, ++ .ro_open = physical_op_open, ++ .ro_maxpages = physical_op_maxpages, ++ .ro_init = physical_op_init, ++ .ro_destroy = physical_op_destroy, ++ .ro_displayname = "physical", ++}; +diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c +index a47f170..888823b 100644 +--- a/net/sunrpc/xprtrdma/rpc_rdma.c ++++ b/net/sunrpc/xprtrdma/rpc_rdma.c +@@ -61,84 +61,26 @@ enum rpcrdma_chunktype { + rpcrdma_replych + }; + ++#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) + static const char transfertypes[][12] = { +- "inline", /* no chunks */ +- "read list", /* some argument via rdma read */ +- "*read list", /* entire request via rdma read */ +- "write list", /* some result via rdma write */ ++ "pure inline", /* no chunks */ ++ " read chunk", /* some argument via rdma read */ ++ "*read chunk", /* entire request via rdma read */ ++ "write chunk", /* some result via rdma write */ + "reply chunk" /* entire reply via rdma write */ + }; +- +-/* Returns size of largest RPC-over-RDMA header in a Call message +- * +- * The largest Call header contains a full-size Read list and a +- * minimal Reply chunk. +- */ +-static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) +-{ +- unsigned int size; +- +- /* Fixed header fields and list discriminators */ +- size = RPCRDMA_HDRLEN_MIN; +- +- /* Maximum Read list size */ +- maxsegs += 2; /* segment for head and tail buffers */ +- size = maxsegs * sizeof(struct rpcrdma_read_chunk); +- +- /* Minimal Read chunk size */ +- size += sizeof(__be32); /* segment count */ +- size += sizeof(struct rpcrdma_segment); +- size += sizeof(__be32); /* list discriminator */ +- +- dprintk("RPC: %s: max call header size = %u\n", +- __func__, size); +- return size; +-} +- +-/* Returns size of largest RPC-over-RDMA header in a Reply message +- * +- * There is only one Write list or one Reply chunk per Reply +- * message. The larger list is the Write list. +- */ +-static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) +-{ +- unsigned int size; +- +- /* Fixed header fields and list discriminators */ +- size = RPCRDMA_HDRLEN_MIN; +- +- /* Maximum Write list size */ +- maxsegs += 2; /* segment for head and tail buffers */ +- size = sizeof(__be32); /* segment count */ +- size += maxsegs * sizeof(struct rpcrdma_segment); +- size += sizeof(__be32); /* list discriminator */ +- +- dprintk("RPC: %s: max reply header size = %u\n", +- __func__, size); +- return size; +-} +- +-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, +- struct rpcrdma_create_data_internal *cdata, +- unsigned int maxsegs) +-{ +- ia->ri_max_inline_write = cdata->inline_wsize - +- rpcrdma_max_call_header_size(maxsegs); +- ia->ri_max_inline_read = cdata->inline_rsize - +- rpcrdma_max_reply_header_size(maxsegs); +-} ++#endif + + /* The client can send a request inline as long as the RPCRDMA header + * plus the RPC call fit under the transport's inline limit. If the + * combined call message size exceeds that limit, the client must use + * the read chunk list for this operation. + */ +-static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, +- struct rpc_rqst *rqst) ++static bool rpcrdma_args_inline(struct rpc_rqst *rqst) + { +- struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + +- return rqst->rq_snd_buf.len <= ia->ri_max_inline_write; ++ return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + } + + /* The client can't know how large the actual reply will be. Thus it +@@ -147,12 +89,11 @@ static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, + * limit, the client must provide a write list or a reply chunk for + * this request. + */ +-static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, +- struct rpc_rqst *rqst) ++static bool rpcrdma_results_inline(struct rpc_rqst *rqst) + { +- struct rpcrdma_ia *ia = &r_xprt->rx_ia; ++ unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + +- return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; ++ return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); + } + + static int +@@ -196,7 +137,8 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) + * MR when they can. + */ + static int +-rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) ++rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, ++ int n, int nsegs) + { + size_t page_offset; + u32 remaining; +@@ -205,7 +147,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) + base = vec->iov_base; + page_offset = offset_in_page(base); + remaining = vec->iov_len; +- while (remaining && n < RPCRDMA_MAX_SEGS) { ++ while (remaining && n < nsegs) { + seg[n].mr_page = NULL; + seg[n].mr_offset = base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); +@@ -229,34 +171,34 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) + + static int + rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, +- enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) ++ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) + { +- int len, n, p, page_base; ++ int len, n = 0, p; ++ int page_base; + struct page **ppages; + +- n = 0; + if (pos == 0) { +- n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); +- if (n == RPCRDMA_MAX_SEGS) +- goto out_overflow; ++ n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); ++ if (n == nsegs) ++ return -EIO; + } + + len = xdrbuf->page_len; + ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); + page_base = xdrbuf->page_base & ~PAGE_MASK; + p = 0; +- while (len && n < RPCRDMA_MAX_SEGS) { ++ while (len && n < nsegs) { + if (!ppages[p]) { + /* alloc the pagelist for receiving buffer */ + ppages[p] = alloc_page(GFP_ATOMIC); + if (!ppages[p]) +- return -EAGAIN; ++ return -ENOMEM; + } + seg[n].mr_page = ppages[p]; + seg[n].mr_offset = (void *)(unsigned long) page_base; + seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); + if (seg[n].mr_len > PAGE_SIZE) +- goto out_overflow; ++ return -EIO; + len -= seg[n].mr_len; + ++n; + ++p; +@@ -264,8 +206,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, + } + + /* Message overflows the seg array */ +- if (len && n == RPCRDMA_MAX_SEGS) +- goto out_overflow; ++ if (len && n == nsegs) ++ return -EIO; + + /* When encoding the read list, the tail is always sent inline */ + if (type == rpcrdma_readch) +@@ -276,28 +218,31 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, + * xdr pad bytes, saving the server an RDMA operation. */ + if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) + return n; +- n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); +- if (n == RPCRDMA_MAX_SEGS) +- goto out_overflow; ++ n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); ++ if (n == nsegs) ++ return -EIO; + } + + return n; +- +-out_overflow: +- pr_err("rpcrdma: segment array overflow\n"); +- return -EIO; +-} +- +-static inline __be32 * +-xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) +-{ +- *iptr++ = cpu_to_be32(mw->mw_handle); +- *iptr++ = cpu_to_be32(mw->mw_length); +- return xdr_encode_hyper(iptr, mw->mw_offset); + } + +-/* XDR-encode the Read list. Supports encoding a list of read +- * segments that belong to a single read chunk. ++/* ++ * Create read/write chunk lists, and reply chunks, for RDMA ++ * ++ * Assume check against THRESHOLD has been done, and chunks are required. ++ * Assume only encoding one list entry for read|write chunks. The NFSv3 ++ * protocol is simple enough to allow this as it only has a single "bulk ++ * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The ++ * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) ++ * ++ * When used for a single reply chunk (which is a special write ++ * chunk used for the entire reply, rather than just the data), it ++ * is used primarily for READDIR and READLINK which would otherwise ++ * be severely size-limited by a small rdma inline read max. The server ++ * response will come back as an RDMA Write, followed by a message ++ * of type RDMA_NOMSG carrying the xid and length. As a result, reply ++ * chunks do not provide data alignment, however they do not require ++ * "fixup" (moving the response to the upper layer buffer) either. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * +@@ -305,190 +250,131 @@ xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) + * N elements, position P (same P for all chunks of same arg!): + * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 + * +- * Returns a pointer to the XDR word in the RDMA header following +- * the end of the Read list, or an error pointer. +- */ +-static __be32 * +-rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, +- struct rpcrdma_req *req, struct rpc_rqst *rqst, +- __be32 *iptr, enum rpcrdma_chunktype rtype) +-{ +- struct rpcrdma_mr_seg *seg; +- struct rpcrdma_mw *mw; +- unsigned int pos; +- int n, nsegs; +- +- if (rtype == rpcrdma_noch) { +- *iptr++ = xdr_zero; /* item not present */ +- return iptr; +- } +- +- pos = rqst->rq_snd_buf.head[0].iov_len; +- if (rtype == rpcrdma_areadch) +- pos = 0; +- seg = req->rl_segments; +- nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg); +- if (nsegs < 0) +- return ERR_PTR(nsegs); +- +- do { +- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, +- false, &mw); +- if (n < 0) +- return ERR_PTR(n); +- list_add(&mw->mw_list, &req->rl_registered); +- +- *iptr++ = xdr_one; /* item present */ +- +- /* All read segments in this chunk +- * have the same "position". +- */ +- *iptr++ = cpu_to_be32(pos); +- iptr = xdr_encode_rdma_segment(iptr, mw); +- +- dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", +- rqst->rq_task->tk_pid, __func__, pos, +- mw->mw_length, (unsigned long long)mw->mw_offset, +- mw->mw_handle, n < nsegs ? "more" : "last"); +- +- r_xprt->rx_stats.read_chunk_count++; +- seg += n; +- nsegs -= n; +- } while (nsegs); +- +- /* Finish Read list */ +- *iptr++ = xdr_zero; /* Next item not present */ +- return iptr; +-} +- +-/* XDR-encode the Write list. Supports encoding a list containing +- * one array of plain segments that belong to a single write chunk. +- * +- * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): +- * + * Write chunklist (a list of (one) counted array): + * N elements: + * 1 - N - HLOO - HLOO - ... - HLOO - 0 + * +- * Returns a pointer to the XDR word in the RDMA header following +- * the end of the Write list, or an error pointer. +- */ +-static __be32 * +-rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, +- struct rpc_rqst *rqst, __be32 *iptr, +- enum rpcrdma_chunktype wtype) +-{ +- struct rpcrdma_mr_seg *seg; +- struct rpcrdma_mw *mw; +- int n, nsegs, nchunks; +- __be32 *segcount; +- +- if (wtype != rpcrdma_writech) { +- *iptr++ = xdr_zero; /* no Write list present */ +- return iptr; +- } +- +- seg = req->rl_segments; +- nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, +- rqst->rq_rcv_buf.head[0].iov_len, +- wtype, seg); +- if (nsegs < 0) +- return ERR_PTR(nsegs); +- +- *iptr++ = xdr_one; /* Write list present */ +- segcount = iptr++; /* save location of segment count */ +- +- nchunks = 0; +- do { +- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, +- true, &mw); +- if (n < 0) +- return ERR_PTR(n); +- list_add(&mw->mw_list, &req->rl_registered); +- +- iptr = xdr_encode_rdma_segment(iptr, mw); +- +- dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", +- rqst->rq_task->tk_pid, __func__, +- mw->mw_length, (unsigned long long)mw->mw_offset, +- mw->mw_handle, n < nsegs ? "more" : "last"); +- +- r_xprt->rx_stats.write_chunk_count++; +- r_xprt->rx_stats.total_rdma_request += seg->mr_len; +- nchunks++; +- seg += n; +- nsegs -= n; +- } while (nsegs); +- +- /* Update count of segments in this Write chunk */ +- *segcount = cpu_to_be32(nchunks); +- +- /* Finish Write list */ +- *iptr++ = xdr_zero; /* Next item not present */ +- return iptr; +-} +- +-/* XDR-encode the Reply chunk. Supports encoding an array of plain +- * segments that belong to a single write (reply) chunk. +- * +- * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): +- * + * Reply chunk (a counted array): + * N elements: + * 1 - N - HLOO - HLOO - ... - HLOO + * +- * Returns a pointer to the XDR word in the RDMA header following +- * the end of the Reply chunk, or an error pointer. ++ * Returns positive RPC/RDMA header size, or negative errno. + */ +-static __be32 * +-rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, +- struct rpcrdma_req *req, struct rpc_rqst *rqst, +- __be32 *iptr, enum rpcrdma_chunktype wtype) ++ ++static ssize_t ++rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, ++ struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) + { +- struct rpcrdma_mr_seg *seg; +- struct rpcrdma_mw *mw; +- int n, nsegs, nchunks; +- __be32 *segcount; +- +- if (wtype != rpcrdma_replych) { +- *iptr++ = xdr_zero; /* no Reply chunk present */ +- return iptr; ++ struct rpcrdma_req *req = rpcr_to_rdmar(rqst); ++ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); ++ int n, nsegs, nchunks = 0; ++ unsigned int pos; ++ struct rpcrdma_mr_seg *seg = req->rl_segments; ++ struct rpcrdma_read_chunk *cur_rchunk = NULL; ++ struct rpcrdma_write_array *warray = NULL; ++ struct rpcrdma_write_chunk *cur_wchunk = NULL; ++ __be32 *iptr = headerp->rm_body.rm_chunks; ++ int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); ++ ++ if (type == rpcrdma_readch || type == rpcrdma_areadch) { ++ /* a read chunk - server will RDMA Read our memory */ ++ cur_rchunk = (struct rpcrdma_read_chunk *) iptr; ++ } else { ++ /* a write or reply chunk - server will RDMA Write our memory */ ++ *iptr++ = xdr_zero; /* encode a NULL read chunk list */ ++ if (type == rpcrdma_replych) ++ *iptr++ = xdr_zero; /* a NULL write chunk list */ ++ warray = (struct rpcrdma_write_array *) iptr; ++ cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); + } + +- seg = req->rl_segments; +- nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg); +- if (nsegs < 0) +- return ERR_PTR(nsegs); ++ if (type == rpcrdma_replych || type == rpcrdma_areadch) ++ pos = 0; ++ else ++ pos = target->head[0].iov_len; + +- *iptr++ = xdr_one; /* Reply chunk present */ +- segcount = iptr++; /* save location of segment count */ ++ nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); ++ if (nsegs < 0) ++ return nsegs; + +- nchunks = 0; ++ map = r_xprt->rx_ia.ri_ops->ro_map; + do { +- n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, +- true, &mw); +- if (n < 0) +- return ERR_PTR(n); +- list_add(&mw->mw_list, &req->rl_registered); +- +- iptr = xdr_encode_rdma_segment(iptr, mw); +- +- dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", +- rqst->rq_task->tk_pid, __func__, +- mw->mw_length, (unsigned long long)mw->mw_offset, +- mw->mw_handle, n < nsegs ? "more" : "last"); +- +- r_xprt->rx_stats.reply_chunk_count++; +- r_xprt->rx_stats.total_rdma_request += seg->mr_len; ++ n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); ++ if (n <= 0) ++ goto out; ++ if (cur_rchunk) { /* read */ ++ cur_rchunk->rc_discrim = xdr_one; ++ /* all read chunks have the same "position" */ ++ cur_rchunk->rc_position = cpu_to_be32(pos); ++ cur_rchunk->rc_target.rs_handle = ++ cpu_to_be32(seg->mr_rkey); ++ cur_rchunk->rc_target.rs_length = ++ cpu_to_be32(seg->mr_len); ++ xdr_encode_hyper( ++ (__be32 *)&cur_rchunk->rc_target.rs_offset, ++ seg->mr_base); ++ dprintk("RPC: %s: read chunk " ++ "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, ++ seg->mr_len, (unsigned long long)seg->mr_base, ++ seg->mr_rkey, pos, n < nsegs ? "more" : "last"); ++ cur_rchunk++; ++ r_xprt->rx_stats.read_chunk_count++; ++ } else { /* write/reply */ ++ cur_wchunk->wc_target.rs_handle = ++ cpu_to_be32(seg->mr_rkey); ++ cur_wchunk->wc_target.rs_length = ++ cpu_to_be32(seg->mr_len); ++ xdr_encode_hyper( ++ (__be32 *)&cur_wchunk->wc_target.rs_offset, ++ seg->mr_base); ++ dprintk("RPC: %s: %s chunk " ++ "elem %d@0x%llx:0x%x (%s)\n", __func__, ++ (type == rpcrdma_replych) ? "reply" : "write", ++ seg->mr_len, (unsigned long long)seg->mr_base, ++ seg->mr_rkey, n < nsegs ? "more" : "last"); ++ cur_wchunk++; ++ if (type == rpcrdma_replych) ++ r_xprt->rx_stats.reply_chunk_count++; ++ else ++ r_xprt->rx_stats.write_chunk_count++; ++ r_xprt->rx_stats.total_rdma_request += seg->mr_len; ++ } + nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + +- /* Update count of segments in the Reply chunk */ +- *segcount = cpu_to_be32(nchunks); ++ /* success. all failures return above */ ++ req->rl_nchunks = nchunks; ++ ++ /* ++ * finish off header. If write, marshal discrim and nchunks. ++ */ ++ if (cur_rchunk) { ++ iptr = (__be32 *) cur_rchunk; ++ *iptr++ = xdr_zero; /* finish the read chunk list */ ++ *iptr++ = xdr_zero; /* encode a NULL write chunk list */ ++ *iptr++ = xdr_zero; /* encode a NULL reply chunk */ ++ } else { ++ warray->wc_discrim = xdr_one; ++ warray->wc_nchunks = cpu_to_be32(nchunks); ++ iptr = (__be32 *) cur_wchunk; ++ if (type == rpcrdma_writech) { ++ *iptr++ = xdr_zero; /* finish the write chunk list */ ++ *iptr++ = xdr_zero; /* encode a NULL reply chunk */ ++ } ++ } ++ ++ /* ++ * Return header size. ++ */ ++ return (unsigned char *)iptr - (unsigned char *)headerp; + +- return iptr; ++out: ++ for (pos = 0; nchunks--;) ++ pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, ++ &req->rl_segments[pos]); ++ return n; + } + + /* +@@ -554,10 +440,13 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) + * Marshal a request: the primary job of this routine is to choose + * the transfer modes. See comments below. + * +- * Prepares up to two IOVs per Call message: +- * +- * [0] -- RPC RDMA header +- * [1] -- the RPC header/data ++ * Uses multiple RDMA IOVs for a request: ++ * [0] -- RPC RDMA header, which uses memory from the *start* of the ++ * preregistered buffer that already holds the RPC data in ++ * its middle. ++ * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. ++ * [2] -- optional padding. ++ * [3] -- if padded, header only in [1] and data here. + * + * Returns zero on success, otherwise a negative errno. + */ +@@ -568,18 +457,24 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) + struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); ++ char *base; ++ size_t rpclen; ++ ssize_t hdrlen; + enum rpcrdma_chunktype rtype, wtype; + struct rpcrdma_msg *headerp; +- bool ddp_allowed; +- ssize_t hdrlen; +- size_t rpclen; +- __be32 *iptr; + + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) + return rpcrdma_bc_marshal_reply(rqst); + #endif + ++ /* ++ * rpclen gets amount of data in first buffer, which is the ++ * pre-registered buffer. ++ */ ++ base = rqst->rq_svec[0].iov_base; ++ rpclen = rqst->rq_svec[0].iov_len; ++ + headerp = rdmab_to_msg(req->rl_rdmabuf); + /* don't byte-swap XID, it's already done in request */ + headerp->rm_xid = rqst->rq_xid; +@@ -587,26 +482,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) + headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); + headerp->rm_type = rdma_msg; + +- /* When the ULP employs a GSS flavor that guarantees integrity +- * or privacy, direct data placement of individual data items +- * is not allowed. +- */ +- ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & +- RPCAUTH_AUTH_DATATOUCH); +- + /* + * Chunks needed for results? + * ++ * o Read ops return data as write chunk(s), header as inline. + * o If the expected result is under the inline threshold, all ops + * return as inline. +- * o Large read ops return data as write chunk(s), header as +- * inline. + * o Large non-read ops return as a single reply chunk. + */ +- if (rpcrdma_results_inline(r_xprt, rqst)) +- wtype = rpcrdma_noch; +- else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) ++ if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + wtype = rpcrdma_writech; ++ else if (rpcrdma_results_inline(rqst)) ++ wtype = rpcrdma_noch; + else + wtype = rpcrdma_replych; + +@@ -624,14 +511,10 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) + * that both has a data payload, and whose non-data arguments + * by themselves are larger than the inline threshold. + */ +- if (rpcrdma_args_inline(r_xprt, rqst)) { ++ if (rpcrdma_args_inline(rqst)) { + rtype = rpcrdma_noch; +- rpcrdma_inline_pullup(rqst); +- rpclen = rqst->rq_svec[0].iov_len; +- } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { ++ } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + rtype = rpcrdma_readch; +- rpclen = rqst->rq_svec[0].iov_len; +- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); + } else { + r_xprt->rx_stats.nomsg_call_count++; + headerp->rm_type = htonl(RDMA_NOMSG); +@@ -639,48 +522,57 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) + rpclen = 0; + } + +- /* This implementation supports the following combinations +- * of chunk lists in one RPC-over-RDMA Call message: +- * +- * - Read list +- * - Write list +- * - Reply chunk +- * - Read list + Reply chunk +- * +- * It might not yet support the following combinations: +- * +- * - Read list + Write list +- * +- * It does not support the following combinations: +- * +- * - Write list + Reply chunk +- * - Read list + Write list + Reply chunk +- * +- * This implementation supports only a single chunk in each +- * Read or Write list. Thus for example the client cannot +- * send a Call message with a Position Zero Read chunk and a +- * regular Read chunk at the same time. ++ /* The following simplification is not true forever */ ++ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) ++ wtype = rpcrdma_noch; ++ if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { ++ dprintk("RPC: %s: cannot marshal multiple chunk lists\n", ++ __func__); ++ return -EIO; ++ } ++ ++ hdrlen = RPCRDMA_HDRLEN_MIN; ++ ++ /* ++ * Pull up any extra send data into the preregistered buffer. ++ * When padding is in use and applies to the transfer, insert ++ * it and change the message type. + */ +- iptr = headerp->rm_body.rm_chunks; +- iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); +- if (IS_ERR(iptr)) +- goto out_unmap; +- iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); +- if (IS_ERR(iptr)) +- goto out_unmap; +- iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); +- if (IS_ERR(iptr)) +- goto out_unmap; +- hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; +- +- if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) +- goto out_overflow; +- +- dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", +- rqst->rq_task->tk_pid, __func__, +- transfertypes[rtype], transfertypes[wtype], +- hdrlen, rpclen); ++ if (rtype == rpcrdma_noch) { ++ ++ rpcrdma_inline_pullup(rqst); ++ ++ headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; ++ headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; ++ headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; ++ /* new length after pullup */ ++ rpclen = rqst->rq_svec[0].iov_len; ++ } else if (rtype == rpcrdma_readch) ++ rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); ++ if (rtype != rpcrdma_noch) { ++ hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, ++ headerp, rtype); ++ wtype = rtype; /* simplify dprintk */ ++ ++ } else if (wtype != rpcrdma_noch) { ++ hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, ++ headerp, wtype); ++ } ++ if (hdrlen < 0) ++ return hdrlen; + ++ dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" ++ " headerp 0x%p base 0x%p lkey 0x%x\n", ++ __func__, transfertypes[wtype], hdrlen, rpclen, ++ headerp, base, rdmab_lkey(req->rl_rdmabuf)); ++ ++ /* ++ * initialize send_iov's - normally only two: rdma chunk header and ++ * single preregistered RPC header buffer, but if padding is present, ++ * then use a preregistered (and zeroed) pad buffer between the RPC ++ * header and any write data. In all non-rdma cases, any following ++ * data has been copied into the RPC header buffer. ++ */ + req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); + req->rl_send_iov[0].length = hdrlen; + req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); +@@ -695,15 +587,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) + + req->rl_niovs = 2; + return 0; +- +-out_overflow: +- pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", +- hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); +- iptr = ERR_PTR(-EIO); +- +-out_unmap: +- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); +- return PTR_ERR(iptr); + } + + /* +@@ -711,13 +594,15 @@ out_unmap: + * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) + */ + static int +-rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp) ++rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) + { + unsigned int i, total_len; + struct rpcrdma_write_chunk *cur_wchunk; + char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); + + i = be32_to_cpu(**iptrp); ++ if (i > max) ++ return -1; + cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); + total_len = 0; + while (i--) { +@@ -748,66 +633,45 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp) + return total_len; + } + +-/** +- * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs +- * @rqst: controlling RPC request +- * @srcp: points to RPC message payload in receive buffer +- * @copy_len: remaining length of receive buffer content +- * @pad: Write chunk pad bytes needed (zero for pure inline) +- * +- * The upper layer has set the maximum number of bytes it can +- * receive in each component of rq_rcv_buf. These values are set in +- * the head.iov_len, page_len, tail.iov_len, and buflen fields. +- * +- * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in +- * many cases this function simply updates iov_base pointers in +- * rq_rcv_buf to point directly to the received reply data, to +- * avoid copying reply data. +- * +- * Returns the count of bytes which had to be memcopied. ++/* ++ * Scatter inline received data back into provided iov's. + */ +-static unsigned long ++static void + rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) + { +- unsigned long fixup_copy_count; +- int i, npages, curlen; ++ int i, npages, curlen, olen; + char *destp; + struct page **ppages; + int page_base; + +- /* The head iovec is redirected to the RPC reply message +- * in the receive buffer, to avoid a memcopy. +- */ +- rqst->rq_rcv_buf.head[0].iov_base = srcp; +- rqst->rq_private_buf.head[0].iov_base = srcp; +- +- /* The contents of the receive buffer that follow +- * head.iov_len bytes are copied into the page list. +- */ + curlen = rqst->rq_rcv_buf.head[0].iov_len; +- if (curlen > copy_len) ++ if (curlen > copy_len) { /* write chunk header fixup */ + curlen = copy_len; ++ rqst->rq_rcv_buf.head[0].iov_len = curlen; ++ } ++ + dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", + __func__, srcp, copy_len, curlen); ++ ++ /* Shift pointer for first receive segment only */ ++ rqst->rq_rcv_buf.head[0].iov_base = srcp; + srcp += curlen; + copy_len -= curlen; + ++ olen = copy_len; ++ i = 0; ++ rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; + page_base = rqst->rq_rcv_buf.page_base; + ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); + page_base &= ~PAGE_MASK; +- fixup_copy_count = 0; +- if (copy_len && rqst->rq_rcv_buf.page_len) { +- int pagelist_len; + +- pagelist_len = rqst->rq_rcv_buf.page_len; +- if (pagelist_len > copy_len) +- pagelist_len = copy_len; +- npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; +- for (i = 0; i < npages; i++) { ++ if (copy_len && rqst->rq_rcv_buf.page_len) { ++ npages = PAGE_ALIGN(page_base + ++ rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; ++ for (; i < npages; i++) { + curlen = PAGE_SIZE - page_base; +- if (curlen > pagelist_len) +- curlen = pagelist_len; +- ++ if (curlen > copy_len) ++ curlen = copy_len; + dprintk("RPC: %s: page %d" + " srcp 0x%p len %d curlen %d\n", + __func__, i, srcp, copy_len, curlen); +@@ -817,32 +681,39 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) + kunmap_atomic(destp); + srcp += curlen; + copy_len -= curlen; +- fixup_copy_count += curlen; +- pagelist_len -= curlen; +- if (!pagelist_len) ++ if (copy_len == 0) + break; + page_base = 0; + } +- +- /* Implicit padding for the last segment in a Write +- * chunk is inserted inline at the front of the tail +- * iovec. The upper layer ignores the content of +- * the pad. Simply ensure inline content in the tail +- * that follows the Write chunk is properly aligned. +- */ +- if (pad) +- srcp -= pad; + } + +- /* The tail iovec is redirected to the remaining data +- * in the receive buffer, to avoid a memcopy. +- */ +- if (copy_len || pad) { +- rqst->rq_rcv_buf.tail[0].iov_base = srcp; +- rqst->rq_private_buf.tail[0].iov_base = srcp; ++ if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { ++ curlen = copy_len; ++ if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) ++ curlen = rqst->rq_rcv_buf.tail[0].iov_len; ++ if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) ++ memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); ++ dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", ++ __func__, srcp, copy_len, curlen); ++ rqst->rq_rcv_buf.tail[0].iov_len = curlen; ++ copy_len -= curlen; ++i; ++ } else ++ rqst->rq_rcv_buf.tail[0].iov_len = 0; ++ ++ if (pad) { ++ /* implicit padding on terminal chunk */ ++ unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base; ++ while (pad--) ++ p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0; + } + +- return fixup_copy_count; ++ if (copy_len) ++ dprintk("RPC: %s: %d bytes in" ++ " %d extra segments (%d lost)\n", ++ __func__, olen, i, copy_len); ++ ++ /* TBD avoid a warning from call_decode() */ ++ rqst->rq_private_buf = rqst->rq_rcv_buf; + } + + void +@@ -978,13 +849,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) + (headerp->rm_body.rm_chunks[1] == xdr_zero && + headerp->rm_body.rm_chunks[2] != xdr_zero) || + (headerp->rm_body.rm_chunks[1] != xdr_zero && +- list_empty(&req->rl_registered))) ++ req->rl_nchunks == 0)) + goto badheader; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) { + /* count any expected write chunks in read reply */ + /* start at write chunk array count */ + iptr = &headerp->rm_body.rm_chunks[2]; +- rdmalen = rpcrdma_count_chunks(rep, 1, &iptr); ++ rdmalen = rpcrdma_count_chunks(rep, ++ req->rl_nchunks, 1, &iptr); + /* check for validity, and no reply chunk after */ + if (rdmalen < 0 || *iptr++ != xdr_zero) + goto badheader; +@@ -1005,10 +877,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) + rep->rr_len -= RPCRDMA_HDRLEN_MIN; + status = rep->rr_len; + } +- +- r_xprt->rx_stats.fixup_copy_count += +- rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, +- rdmalen); ++ /* Fix up the rpc results for upper layer */ ++ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); + break; + + case rdma_nomsg: +@@ -1016,11 +886,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) + if (headerp->rm_body.rm_chunks[0] != xdr_zero || + headerp->rm_body.rm_chunks[1] != xdr_zero || + headerp->rm_body.rm_chunks[2] != xdr_one || +- list_empty(&req->rl_registered)) ++ req->rl_nchunks == 0) + goto badheader; + iptr = (__be32 *)((unsigned char *)headerp + + RPCRDMA_HDRLEN_MIN); +- rdmalen = rpcrdma_count_chunks(rep, 0, &iptr); ++ rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); + if (rdmalen < 0) + goto badheader; + r_xprt->rx_stats.total_rdma_reply += rdmalen; +@@ -1033,9 +903,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) + + badheader: + default: +- dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", +- rqst->rq_task->tk_pid, __func__, +- be32_to_cpu(headerp->rm_type)); ++ dprintk("%s: invalid rpcrdma reply header (type %d):" ++ " chunks[012] == %d %d %d" ++ " expected chunks <= %d\n", ++ __func__, be32_to_cpu(headerp->rm_type), ++ headerp->rm_body.rm_chunks[0], ++ headerp->rm_body.rm_chunks[1], ++ headerp->rm_body.rm_chunks[2], ++ req->rl_nchunks); + status = -EIO; + r_xprt->rx_stats.bad_reply_count++; + break; +@@ -1049,7 +924,7 @@ out: + * control: waking the next RPC waits until this RPC has + * relinquished all its Send Queue entries. + */ +- if (!list_empty(&req->rl_registered)) ++ if (req->rl_nchunks) + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); + + spin_lock_bh(&xprt->transport_lock); +diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c +index 0ba9887..765bca4 100644 +--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c ++++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c +@@ -145,32 +145,19 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend) + return (__be32 *)&ary->wc_array[nchunks]; + } + +-/** +- * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header +- * @rq_arg: Receive buffer +- * +- * On entry, xdr->head[0].iov_base points to first byte in the +- * RPC-over-RDMA header. +- * +- * On successful exit, head[0] points to first byte past the +- * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. +- * The length of the RPC-over-RDMA header is returned. +- */ +-int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) ++int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp) + { +- struct rpcrdma_msg *rmsgp; + __be32 *va, *vaend; + unsigned int len; + u32 hdr_len; + + /* Verify that there's enough bytes for header + something */ +- if (rq_arg->len <= RPCRDMA_HDRLEN_ERR) { ++ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) { + dprintk("svcrdma: header too short = %d\n", +- rq_arg->len); ++ rqstp->rq_arg.len); + return -EINVAL; + } + +- rmsgp = (struct rpcrdma_msg *)rq_arg->head[0].iov_base; + if (rmsgp->rm_vers != rpcrdma_version) { + dprintk("%s: bad version %u\n", __func__, + be32_to_cpu(rmsgp->rm_vers)); +@@ -202,10 +189,10 @@ int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) + be32_to_cpu(rmsgp->rm_body.rm_padded.rm_thresh); + + va = &rmsgp->rm_body.rm_padded.rm_pempty[4]; +- rq_arg->head[0].iov_base = va; ++ rqstp->rq_arg.head[0].iov_base = va; + len = (u32)((unsigned long)va - (unsigned long)rmsgp); +- rq_arg->head[0].iov_len -= len; +- if (len > rq_arg->len) ++ rqstp->rq_arg.head[0].iov_len -= len; ++ if (len > rqstp->rq_arg.len) + return -EINVAL; + return len; + default: +@@ -218,7 +205,7 @@ int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) + * chunk list and a reply chunk list. + */ + va = &rmsgp->rm_body.rm_chunks[0]; +- vaend = (__be32 *)((unsigned long)rmsgp + rq_arg->len); ++ vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len); + va = decode_read_list(va, vaend); + if (!va) { + dprintk("svcrdma: failed to decode read list\n"); +@@ -235,9 +222,10 @@ int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) + return -EINVAL; + } + +- rq_arg->head[0].iov_base = va; ++ rqstp->rq_arg.head[0].iov_base = va; + hdr_len = (unsigned long)va - (unsigned long)rmsgp; +- rq_arg->head[0].iov_len -= hdr_len; ++ rqstp->rq_arg.head[0].iov_len -= hdr_len; ++ + return hdr_len; + } + +diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +index 2c25606..3b24a64 100644 +--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c ++++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +@@ -447,8 +447,10 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, + head->arg.len = rqstp->rq_arg.len; + head->arg.buflen = rqstp->rq_arg.buflen; + +- /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ ++ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; + position = be32_to_cpu(ch->rc_position); ++ ++ /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ + if (position == 0) { + head->arg.pages = &head->pages[0]; + page_offset = head->byte_len; +@@ -486,7 +488,7 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, + if (page_offset & 3) { + u32 pad = 4 - (page_offset & 3); + +- head->arg.tail[0].iov_len += pad; ++ head->arg.page_len += pad; + head->arg.len += pad; + head->arg.buflen += pad; + page_offset += pad; +@@ -508,10 +510,11 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, + return ret; + } + +-static void rdma_read_complete(struct svc_rqst *rqstp, +- struct svc_rdma_op_ctxt *head) ++static int rdma_read_complete(struct svc_rqst *rqstp, ++ struct svc_rdma_op_ctxt *head) + { + int page_no; ++ int ret; + + /* Copy RPC pages */ + for (page_no = 0; page_no < head->count; page_no++) { +@@ -547,6 +550,23 @@ static void rdma_read_complete(struct svc_rqst *rqstp, + rqstp->rq_arg.tail[0] = head->arg.tail[0]; + rqstp->rq_arg.len = head->arg.len; + rqstp->rq_arg.buflen = head->arg.buflen; ++ ++ /* Free the context */ ++ svc_rdma_put_context(head, 0); ++ ++ /* XXX: What should this be? */ ++ rqstp->rq_prot = IPPROTO_MAX; ++ svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt); ++ ++ ret = rqstp->rq_arg.head[0].iov_len ++ + rqstp->rq_arg.page_len ++ + rqstp->rq_arg.tail[0].iov_len; ++ dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, " ++ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n", ++ ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, ++ rqstp->rq_arg.head[0].iov_len); ++ ++ return ret; + } + + /* By convention, backchannel calls arrive via rdma_msg type +@@ -604,8 +624,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) + dto_q); + list_del_init(&ctxt->dto_q); + spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); +- rdma_read_complete(rqstp, ctxt); +- goto complete; ++ return rdma_read_complete(rqstp, ctxt); + } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { + ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, + struct svc_rdma_op_ctxt, +@@ -636,7 +655,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) + + /* Decode the RDMA header. */ + rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; +- ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg); ++ ret = svc_rdma_xdr_decode_req(rmsgp, rqstp); + if (ret < 0) + goto out_err; + if (ret == 0) +@@ -663,7 +682,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) + return 0; + } + +-complete: + ret = rqstp->rq_arg.head[0].iov_len + + rqstp->rq_arg.page_len + + rqstp->rq_arg.tail[0].iov_len; +diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c +index 54d53330..4f1b1c4 100644 +--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c ++++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c +@@ -463,21 +463,25 @@ static int send_reply(struct svcxprt_rdma *rdma, + struct svc_rqst *rqstp, + struct page *page, + struct rpcrdma_msg *rdma_resp, ++ struct svc_rdma_op_ctxt *ctxt, + struct svc_rdma_req_map *vec, + int byte_count) + { +- struct svc_rdma_op_ctxt *ctxt; + struct ib_send_wr send_wr; + u32 xdr_off; + int sge_no; + int sge_bytes; + int page_no; + int pages; +- int ret = -EIO; ++ int ret; ++ ++ ret = svc_rdma_repost_recv(rdma, GFP_KERNEL); ++ if (ret) { ++ svc_rdma_put_context(ctxt, 0); ++ return -ENOTCONN; ++ } + + /* Prepare the context */ +- ctxt = svc_rdma_get_context(rdma); +- ctxt->direction = DMA_TO_DEVICE; + ctxt->pages[0] = page; + ctxt->count = 1; + +@@ -561,7 +565,8 @@ static int send_reply(struct svcxprt_rdma *rdma, + err: + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); +- return ret; ++ pr_err("svcrdma: failed to send reply, rc=%d\n", ret); ++ return -EIO; + } + + void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) +@@ -580,6 +585,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) + int ret; + int inline_bytes; + struct page *res_page; ++ struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_req_map *vec; + + dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); +@@ -592,6 +598,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) + rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary); + + /* Build an req vec for the XDR */ ++ ctxt = svc_rdma_get_context(rdma); ++ ctxt->direction = DMA_TO_DEVICE; + vec = svc_rdma_get_req_map(rdma); + ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL); + if (ret) +@@ -627,12 +635,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) + inline_bytes -= ret; + } + +- /* Post a fresh Receive buffer _before_ sending the reply */ +- ret = svc_rdma_post_recv(rdma, GFP_KERNEL); +- if (ret) +- goto err1; +- +- ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, ++ ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec, + inline_bytes); + if (ret < 0) + goto err1; +@@ -645,8 +648,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) + put_page(res_page); + err0: + svc_rdma_put_req_map(rdma, vec); +- pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", +- ret); ++ svc_rdma_put_context(ctxt, 0); + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + return -ENOTCONN; + } +diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c +index dd94401..9066896 100644 +--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c ++++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c +@@ -789,7 +789,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, + int ret; + + dprintk("svcrdma: Creating RDMA socket\n"); +- if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { ++ if (sa->sa_family != AF_INET) { + dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); + return ERR_PTR(-EAFNOSUPPORT); + } +@@ -805,16 +805,6 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, + goto err0; + } + +- /* Allow both IPv4 and IPv6 sockets to bind a single port +- * at the same time. +- */ +-#if IS_ENABLED(CONFIG_IPV6) +- ret = rdma_set_afonly(listen_id, 1); +- if (ret) { +- dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); +- goto err1; +- } +-#endif + ret = rdma_bind_addr(listen_id, sa); + if (ret) { + dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); +@@ -1083,7 +1073,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) + newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; + + /* Post receive buffers */ +- for (i = 0; i < newxprt->sc_max_requests; i++) { ++ for (i = 0; i < newxprt->sc_rq_depth; i++) { + ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); + if (ret) { + dprintk("svcrdma: failure posting receive buffers\n"); +@@ -1180,9 +1170,6 @@ static void __svc_rdma_free(struct work_struct *work) + + dprintk("svcrdma: %s(%p)\n", __func__, rdma); + +- if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) +- ib_drain_qp(rdma->sc_qp); +- + /* We should only be called from kref_put */ + if (atomic_read(&xprt->xpt_ref.refcount) != 0) + pr_err("svcrdma: sc_xprt still in use? (%d)\n", +diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c +index 81f0e87..b1b009f 100644 +--- a/net/sunrpc/xprtrdma/transport.c ++++ b/net/sunrpc/xprtrdma/transport.c +@@ -73,8 +73,6 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; + + static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; + static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; +-static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; +-static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; + static unsigned int zero; + static unsigned int max_padding = PAGE_SIZE; + static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; +@@ -98,8 +96,6 @@ static struct ctl_table xr_tunables_table[] = { + .maxlen = sizeof(unsigned int), + .mode = 0644, + .proc_handler = proc_dointvec, +- .extra1 = &min_inline_size, +- .extra2 = &max_inline_size, + }, + { + .procname = "rdma_max_inline_write", +@@ -107,8 +103,6 @@ static struct ctl_table xr_tunables_table[] = { + .maxlen = sizeof(unsigned int), + .mode = 0644, + .proc_handler = proc_dointvec, +- .extra1 = &min_inline_size, +- .extra2 = &max_inline_size, + }, + { + .procname = "rdma_inline_write_padding", +@@ -514,7 +508,6 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) + out: + dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); + req->rl_connect_cookie = 0; /* our reserved value */ +- req->rl_task = task; + return req->rl_sendbuf->rg_base; + + out_rdmabuf: +@@ -558,6 +551,7 @@ out_sendbuf: + + out_fail: + rpcrdma_buffer_put(req); ++ r_xprt->rx_stats.failed_marshal_count++; + return NULL; + } + +@@ -570,6 +564,7 @@ xprt_rdma_free(void *buffer) + struct rpcrdma_req *req; + struct rpcrdma_xprt *r_xprt; + struct rpcrdma_regbuf *rb; ++ int i; + + if (buffer == NULL) + return; +@@ -583,25 +578,17 @@ xprt_rdma_free(void *buffer) + + dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); + +- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, +- !RPC_IS_ASYNC(req->rl_task)); ++ for (i = 0; req->rl_nchunks;) { ++ --req->rl_nchunks; ++ i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, ++ &req->rl_segments[i]); ++ } + + rpcrdma_buffer_put(req); + } + +-/** +- * xprt_rdma_send_request - marshal and send an RPC request +- * @task: RPC task with an RPC message in rq_snd_buf +- * +- * Return values: +- * 0: The request has been sent +- * ENOTCONN: Caller needs to invoke connect logic then call again +- * ENOBUFS: Call again later to send the request +- * EIO: A permanent error occurred. The request was not sent, +- * and don't try it again +- * ++/* + * send_request invokes the meat of RPC RDMA. It must do the following: +- * + * 1. Marshal the RPC request into an RPC RDMA request, which means + * putting a header in front of data, and creating IOVs for RDMA + * from those in the request. +@@ -610,6 +597,7 @@ xprt_rdma_free(void *buffer) + * the request (rpcrdma_ep_post). + * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). + */ ++ + static int + xprt_rdma_send_request(struct rpc_task *task) + { +@@ -619,9 +607,6 @@ xprt_rdma_send_request(struct rpc_task *task) + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + int rc = 0; + +- /* On retransmit, remove any previously registered chunks */ +- r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); +- + rc = rpcrdma_marshal_req(rqst); + if (rc < 0) + goto failed_marshal; +@@ -642,12 +627,11 @@ xprt_rdma_send_request(struct rpc_task *task) + return 0; + + failed_marshal: ++ r_xprt->rx_stats.failed_marshal_count++; + dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n", + __func__, rc); + if (rc == -EIO) +- r_xprt->rx_stats.failed_marshal_count++; +- if (rc != -ENOTCONN) +- return rc; ++ return -EIO; + drop_connection: + xprt_disconnect_done(xprt); + return -ENOTCONN; /* implies disconnect */ +@@ -673,7 +657,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) + xprt->stat.bad_xids, + xprt->stat.req_u, + xprt->stat.bklog_u); +- seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ", ++ seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n", + r_xprt->rx_stats.read_chunk_count, + r_xprt->rx_stats.write_chunk_count, + r_xprt->rx_stats.reply_chunk_count, +@@ -685,10 +669,6 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) + r_xprt->rx_stats.failed_marshal_count, + r_xprt->rx_stats.bad_reply_count, + r_xprt->rx_stats.nomsg_call_count); +- seq_printf(seq, "%lu %lu %lu\n", +- r_xprt->rx_stats.mrs_recovered, +- r_xprt->rx_stats.mrs_orphaned, +- r_xprt->rx_stats.mrs_allocated); + } + + static int +@@ -727,7 +707,6 @@ static struct rpc_xprt_ops xprt_rdma_procs = { + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + .bc_setup = xprt_rdma_bc_setup, + .bc_up = xprt_rdma_bc_up, +- .bc_maxpayload = xprt_rdma_bc_maxpayload, + .bc_free_rqst = xprt_rdma_bc_free_rqst, + .bc_destroy = xprt_rdma_bc_destroy, + #endif +@@ -758,6 +737,7 @@ void xprt_rdma_cleanup(void) + __func__, rc); + + rpcrdma_destroy_wq(); ++ frwr_destroy_recovery_wq(); + + rc = xprt_unregister_transport(&xprt_rdma_bc); + if (rc) +@@ -769,13 +749,20 @@ int xprt_rdma_init(void) + { + int rc; + +- rc = rpcrdma_alloc_wq(); ++ rc = frwr_alloc_recovery_wq(); + if (rc) + return rc; + ++ rc = rpcrdma_alloc_wq(); ++ if (rc) { ++ frwr_destroy_recovery_wq(); ++ return rc; ++ } ++ + rc = xprt_register_transport(&xprt_rdma); + if (rc) { + rpcrdma_destroy_wq(); ++ frwr_destroy_recovery_wq(); + return rc; + } + +@@ -783,6 +770,7 @@ int xprt_rdma_init(void) + if (rc) { + xprt_unregister_transport(&xprt_rdma); + rpcrdma_destroy_wq(); ++ frwr_destroy_recovery_wq(); + return rc; + } + +diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c +index 799cce6..f5ed9f9 100644 +--- a/net/sunrpc/xprtrdma/verbs.c ++++ b/net/sunrpc/xprtrdma/verbs.c +@@ -51,7 +51,6 @@ + #include + #include + #include +-#include + #include + #include /* try_module_get()/module_put() */ + +@@ -204,6 +203,15 @@ out_fail: + goto out_schedule; + } + ++static void ++rpcrdma_flush_cqs(struct rpcrdma_ep *ep) ++{ ++ struct ib_wc wc; ++ ++ while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) ++ rpcrdma_receive_wc(NULL, &wc); ++} ++ + static int + rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) + { +@@ -366,6 +374,23 @@ out: + } + + /* ++ * Drain any cq, prior to teardown. ++ */ ++static void ++rpcrdma_clean_cq(struct ib_cq *cq) ++{ ++ struct ib_wc wc; ++ int count = 0; ++ ++ while (1 == ib_poll_cq(cq, 1, &wc)) ++ ++count; ++ ++ if (count) ++ dprintk("RPC: %s: flushed %d events (last 0x%x)\n", ++ __func__, count, wc.opcode); ++} ++ ++/* + * Exported functions. + */ + +@@ -380,6 +405,8 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) + struct rpcrdma_ia *ia = &xprt->rx_ia; + int rc; + ++ ia->ri_dma_mr = NULL; ++ + ia->ri_id = rpcrdma_create_id(xprt, ia, addr); + if (IS_ERR(ia->ri_id)) { + rc = PTR_ERR(ia->ri_id); +@@ -390,30 +417,49 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) + ia->ri_pd = ib_alloc_pd(ia->ri_device); + if (IS_ERR(ia->ri_pd)) { + rc = PTR_ERR(ia->ri_pd); +- pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); ++ dprintk("RPC: %s: ib_alloc_pd() failed %i\n", ++ __func__, rc); + goto out2; + } + ++ if (memreg == RPCRDMA_FRMR) { ++ if (!(ia->ri_device->attrs.device_cap_flags & ++ IB_DEVICE_MEM_MGT_EXTENSIONS) || ++ (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { ++ dprintk("RPC: %s: FRMR registration " ++ "not supported by HCA\n", __func__); ++ memreg = RPCRDMA_MTHCAFMR; ++ } ++ } ++ if (memreg == RPCRDMA_MTHCAFMR) { ++ if (!ia->ri_device->alloc_fmr) { ++ dprintk("RPC: %s: MTHCAFMR registration " ++ "not supported by HCA\n", __func__); ++ rc = -EINVAL; ++ goto out3; ++ } ++ } ++ + switch (memreg) { + case RPCRDMA_FRMR: +- if (frwr_is_supported(ia)) { +- ia->ri_ops = &rpcrdma_frwr_memreg_ops; +- break; +- } +- /*FALLTHROUGH*/ ++ ia->ri_ops = &rpcrdma_frwr_memreg_ops; ++ break; ++ case RPCRDMA_ALLPHYSICAL: ++ ia->ri_ops = &rpcrdma_physical_memreg_ops; ++ break; + case RPCRDMA_MTHCAFMR: +- if (fmr_is_supported(ia)) { +- ia->ri_ops = &rpcrdma_fmr_memreg_ops; +- break; +- } +- /*FALLTHROUGH*/ ++ ia->ri_ops = &rpcrdma_fmr_memreg_ops; ++ break; + default: +- pr_err("rpcrdma: Unsupported memory registration mode: %d\n", +- memreg); +- rc = -EINVAL; ++ printk(KERN_ERR "RPC: Unsupported memory " ++ "registration mode: %d\n", memreg); ++ rc = -ENOMEM; + goto out3; + } ++ dprintk("RPC: %s: memory registration strategy is '%s'\n", ++ __func__, ia->ri_ops->ro_displayname); + ++ rwlock_init(&ia->ri_qplock); + return 0; + + out3: +@@ -469,7 +515,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, + __func__); + return -ENOMEM; + } +- max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; ++ max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; + + /* check provider's send/recv wr limits */ + if (cdata->max_requests > max_qp_wr) +@@ -480,13 +526,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, + ep->rep_attr.srq = NULL; + ep->rep_attr.cap.max_send_wr = cdata->max_requests; + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; +- ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ + rc = ia->ri_ops->ro_open(ia, ep, cdata); + if (rc) + return rc; + ep->rep_attr.cap.max_recv_wr = cdata->max_requests; + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; +- ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ + ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; + ep->rep_attr.cap.max_recv_sge = 1; + ep->rep_attr.cap.max_inline_data = 0; +@@ -534,7 +578,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, + ep->rep_attr.recv_cq = recvcq; + + /* Initialize cma parameters */ +- memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); + + /* RPC/RDMA does not use private data */ + ep->rep_remote_cma.private_data = NULL; +@@ -548,16 +591,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, + ep->rep_remote_cma.responder_resources = + ia->ri_device->attrs.max_qp_rd_atom; + +- /* Limit transport retries so client can detect server +- * GID changes quickly. RPC layer handles re-establishing +- * transport connection and retransmission. +- */ +- ep->rep_remote_cma.retry_count = 6; +- +- /* RPC-over-RDMA handles its own flow control. In addition, +- * make all RNR NAKs visible so we know that RPC-over-RDMA +- * flow control is working correctly (no NAKs should be seen). +- */ ++ ep->rep_remote_cma.retry_count = 7; + ep->rep_remote_cma.flow_control = 0; + ep->rep_remote_cma.rnr_retry_count = 0; + +@@ -566,6 +600,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, + out2: + ib_free_cq(sendcq); + out1: ++ if (ia->ri_dma_mr) ++ ib_dereg_mr(ia->ri_dma_mr); + return rc; + } + +@@ -579,19 +615,32 @@ out1: + void + rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) + { ++ int rc; ++ + dprintk("RPC: %s: entering, connected is %d\n", + __func__, ep->rep_connected); + + cancel_delayed_work_sync(&ep->rep_connect_worker); + +- if (ia->ri_id->qp) { ++ if (ia->ri_id->qp) + rpcrdma_ep_disconnect(ep, ia); ++ ++ rpcrdma_clean_cq(ep->rep_attr.recv_cq); ++ rpcrdma_clean_cq(ep->rep_attr.send_cq); ++ ++ if (ia->ri_id->qp) { + rdma_destroy_qp(ia->ri_id); + ia->ri_id->qp = NULL; + } + + ib_free_cq(ep->rep_attr.recv_cq); + ib_free_cq(ep->rep_attr.send_cq); ++ ++ if (ia->ri_dma_mr) { ++ rc = ib_dereg_mr(ia->ri_dma_mr); ++ dprintk("RPC: %s: ib_dereg_mr returned %i\n", ++ __func__, rc); ++ } + } + + /* +@@ -610,6 +659,7 @@ retry: + dprintk("RPC: %s: reconnecting...\n", __func__); + + rpcrdma_ep_disconnect(ep, ia); ++ rpcrdma_flush_cqs(ep); + + xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); + id = rpcrdma_create_id(xprt, ia, +@@ -642,8 +692,10 @@ retry: + goto out; + } + ++ write_lock(&ia->ri_qplock); + old = ia->ri_id; + ia->ri_id = id; ++ write_unlock(&ia->ri_qplock); + + rdma_destroy_qp(old); + rpcrdma_destroy_id(old); +@@ -733,6 +785,7 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) + { + int rc; + ++ rpcrdma_flush_cqs(ep); + rc = rdma_disconnect(ia->ri_id); + if (!rc) { + /* returns without wait if not connected */ +@@ -744,92 +797,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) + dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); + ep->rep_connected = rc; + } +- +- ib_drain_qp(ia->ri_id->qp); +-} +- +-static void +-rpcrdma_mr_recovery_worker(struct work_struct *work) +-{ +- struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, +- rb_recovery_worker.work); +- struct rpcrdma_mw *mw; +- +- spin_lock(&buf->rb_recovery_lock); +- while (!list_empty(&buf->rb_stale_mrs)) { +- mw = list_first_entry(&buf->rb_stale_mrs, +- struct rpcrdma_mw, mw_list); +- list_del_init(&mw->mw_list); +- spin_unlock(&buf->rb_recovery_lock); +- +- dprintk("RPC: %s: recovering MR %p\n", __func__, mw); +- mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); +- +- spin_lock(&buf->rb_recovery_lock); +- } +- spin_unlock(&buf->rb_recovery_lock); +-} +- +-void +-rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) +-{ +- struct rpcrdma_xprt *r_xprt = mw->mw_xprt; +- struct rpcrdma_buffer *buf = &r_xprt->rx_buf; +- +- spin_lock(&buf->rb_recovery_lock); +- list_add(&mw->mw_list, &buf->rb_stale_mrs); +- spin_unlock(&buf->rb_recovery_lock); +- +- schedule_delayed_work(&buf->rb_recovery_worker, 0); +-} +- +-static void +-rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) +-{ +- struct rpcrdma_buffer *buf = &r_xprt->rx_buf; +- struct rpcrdma_ia *ia = &r_xprt->rx_ia; +- unsigned int count; +- LIST_HEAD(free); +- LIST_HEAD(all); +- +- for (count = 0; count < 32; count++) { +- struct rpcrdma_mw *mw; +- int rc; +- +- mw = kzalloc(sizeof(*mw), GFP_KERNEL); +- if (!mw) +- break; +- +- rc = ia->ri_ops->ro_init_mr(ia, mw); +- if (rc) { +- kfree(mw); +- break; +- } +- +- mw->mw_xprt = r_xprt; +- +- list_add(&mw->mw_list, &free); +- list_add(&mw->mw_all, &all); +- } +- +- spin_lock(&buf->rb_mwlock); +- list_splice(&free, &buf->rb_mws); +- list_splice(&all, &buf->rb_all); +- r_xprt->rx_stats.mrs_allocated += count; +- spin_unlock(&buf->rb_mwlock); +- +- dprintk("RPC: %s: created %u MRs\n", __func__, count); +-} +- +-static void +-rpcrdma_mr_refresh_worker(struct work_struct *work) +-{ +- struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, +- rb_refresh_worker.work); +- struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, +- rx_buf); +- +- rpcrdma_create_mrs(r_xprt); + } + + struct rpcrdma_req * +@@ -848,7 +815,6 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) + spin_unlock(&buffer->rb_reqslock); + req->rl_cqe.done = rpcrdma_wc_send; + req->rl_buffer = &r_xprt->rx_buf; +- INIT_LIST_HEAD(&req->rl_registered); + return req; + } + +@@ -888,23 +854,17 @@ int + rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) + { + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; ++ struct rpcrdma_ia *ia = &r_xprt->rx_ia; + int i, rc; + + buf->rb_max_requests = r_xprt->rx_data.max_requests; + buf->rb_bc_srv_max_requests = 0; +- atomic_set(&buf->rb_credits, 1); +- spin_lock_init(&buf->rb_mwlock); + spin_lock_init(&buf->rb_lock); +- spin_lock_init(&buf->rb_recovery_lock); +- INIT_LIST_HEAD(&buf->rb_mws); +- INIT_LIST_HEAD(&buf->rb_all); +- INIT_LIST_HEAD(&buf->rb_stale_mrs); +- INIT_DELAYED_WORK(&buf->rb_refresh_worker, +- rpcrdma_mr_refresh_worker); +- INIT_DELAYED_WORK(&buf->rb_recovery_worker, +- rpcrdma_mr_recovery_worker); ++ atomic_set(&buf->rb_credits, 1); + +- rpcrdma_create_mrs(r_xprt); ++ rc = ia->ri_ops->ro_init(r_xprt); ++ if (rc) ++ goto out; + + INIT_LIST_HEAD(&buf->rb_send_bufs); + INIT_LIST_HEAD(&buf->rb_allreqs); +@@ -924,7 +884,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) + } + + INIT_LIST_HEAD(&buf->rb_recv_bufs); +- for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { ++ for (i = 0; i < buf->rb_max_requests + 2; i++) { + struct rpcrdma_rep *rep; + + rep = rpcrdma_create_rep(r_xprt); +@@ -980,46 +940,17 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) + kfree(req); + } + +-static void +-rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) +-{ +- struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, +- rx_buf); +- struct rpcrdma_ia *ia = rdmab_to_ia(buf); +- struct rpcrdma_mw *mw; +- unsigned int count; +- +- count = 0; +- spin_lock(&buf->rb_mwlock); +- while (!list_empty(&buf->rb_all)) { +- mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); +- list_del(&mw->mw_all); +- +- spin_unlock(&buf->rb_mwlock); +- ia->ri_ops->ro_release_mr(mw); +- count++; +- spin_lock(&buf->rb_mwlock); +- } +- spin_unlock(&buf->rb_mwlock); +- r_xprt->rx_stats.mrs_allocated = 0; +- +- dprintk("RPC: %s: released %u MRs\n", __func__, count); +-} +- + void + rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) + { + struct rpcrdma_ia *ia = rdmab_to_ia(buf); + +- cancel_delayed_work_sync(&buf->rb_recovery_worker); +- + while (!list_empty(&buf->rb_recv_bufs)) { + struct rpcrdma_rep *rep; + + rep = rpcrdma_buffer_get_rep_locked(buf); + rpcrdma_destroy_rep(ia, rep); + } +- buf->rb_send_count = 0; + + spin_lock(&buf->rb_reqslock); + while (!list_empty(&buf->rb_allreqs)) { +@@ -1034,9 +965,8 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) + spin_lock(&buf->rb_reqslock); + } + spin_unlock(&buf->rb_reqslock); +- buf->rb_recv_count = 0; + +- rpcrdma_destroy_mrs(buf); ++ ia->ri_ops->ro_destroy(buf); + } + + struct rpcrdma_mw * +@@ -1054,17 +984,8 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) + spin_unlock(&buf->rb_mwlock); + + if (!mw) +- goto out_nomws; ++ pr_err("RPC: %s: no MWs available\n", __func__); + return mw; +- +-out_nomws: +- dprintk("RPC: %s: no MWs available\n", __func__); +- schedule_delayed_work(&buf->rb_refresh_worker, 0); +- +- /* Allow the reply handler and refresh worker to run */ +- cond_resched(); +- +- return NULL; + } + + void +@@ -1077,23 +998,6 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) + spin_unlock(&buf->rb_mwlock); + } + +-static struct rpcrdma_rep * +-rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) +-{ +- /* If an RPC previously completed without a reply (say, a +- * credential problem or a soft timeout occurs) then hold off +- * on supplying more Receive buffers until the number of new +- * pending RPCs catches up to the number of posted Receives. +- */ +- if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) +- return NULL; +- +- if (unlikely(list_empty(&buffers->rb_recv_bufs))) +- return NULL; +- buffers->rb_recv_count++; +- return rpcrdma_buffer_get_rep_locked(buffers); +-} +- + /* + * Get a set of request/reply buffers. + * +@@ -1107,9 +1011,10 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) + spin_lock(&buffers->rb_lock); + if (list_empty(&buffers->rb_send_bufs)) + goto out_reqbuf; +- buffers->rb_send_count++; + req = rpcrdma_buffer_get_req_locked(buffers); +- req->rl_reply = rpcrdma_buffer_get_rep(buffers); ++ if (list_empty(&buffers->rb_recv_bufs)) ++ goto out_repbuf; ++ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock(&buffers->rb_lock); + return req; + +@@ -1117,6 +1022,11 @@ out_reqbuf: + spin_unlock(&buffers->rb_lock); + pr_warn("RPC: %s: out of request buffers\n", __func__); + return NULL; ++out_repbuf: ++ spin_unlock(&buffers->rb_lock); ++ pr_warn("RPC: %s: out of reply buffers\n", __func__); ++ req->rl_reply = NULL; ++ return req; + } + + /* +@@ -1133,12 +1043,9 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) + req->rl_reply = NULL; + + spin_lock(&buffers->rb_lock); +- buffers->rb_send_count--; + list_add_tail(&req->rl_free, &buffers->rb_send_bufs); +- if (rep) { +- buffers->rb_recv_count--; ++ if (rep) + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); +- } + spin_unlock(&buffers->rb_lock); + } + +@@ -1152,7 +1059,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) + struct rpcrdma_buffer *buffers = req->rl_buffer; + + spin_lock(&buffers->rb_lock); +- req->rl_reply = rpcrdma_buffer_get_rep(buffers); ++ if (!list_empty(&buffers->rb_recv_bufs)) ++ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock(&buffers->rb_lock); + } + +@@ -1166,7 +1074,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) + struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; + + spin_lock(&buffers->rb_lock); +- buffers->rb_recv_count--; + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock(&buffers->rb_lock); + } +@@ -1175,6 +1082,14 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) + * Wrappers for internal-use kmalloc memory registration, used by buffer code. + */ + ++void ++rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) ++{ ++ dprintk("RPC: map_one: offset %p iova %llx len %zu\n", ++ seg->mr_offset, ++ (unsigned long long)seg->mr_dma, seg->mr_dmalen); ++} ++ + /** + * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers + * @ia: controlling rpcrdma_ia +@@ -1257,7 +1172,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, + if (rep) { + rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (rc) +- return rc; ++ goto out; + req->rl_reply = NULL; + } + +@@ -1282,12 +1197,10 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, + + rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); + if (rc) +- goto out_postsend_err; +- return 0; +- +-out_postsend_err: +- pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); +- return -ENOTCONN; ++ dprintk("RPC: %s: ib_post_send returned %i\n", __func__, ++ rc); ++out: ++ return rc; + } + + /* +@@ -1312,13 +1225,11 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, + DMA_BIDIRECTIONAL); + + rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); +- if (rc) +- goto out_postrecv; +- return 0; + +-out_postrecv: +- pr_err("rpcrdma: ib_post_recv returned %i\n", rc); +- return -ENOTCONN; ++ if (rc) ++ dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, ++ rc); ++ return rc; + } + + /** +@@ -1360,3 +1271,25 @@ out_rc: + rpcrdma_recv_buffer_put(rep); + return rc; + } ++ ++/* How many chunk list items fit within our inline buffers? ++ */ ++unsigned int ++rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) ++{ ++ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; ++ int bytes, segments; ++ ++ bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); ++ bytes -= RPCRDMA_HDRLEN_MIN; ++ if (bytes < sizeof(struct rpcrdma_segment) * 2) { ++ pr_warn("RPC: %s: inline threshold too small\n", ++ __func__); ++ return 0; ++ } ++ ++ segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); ++ dprintk("RPC: %s: max chunk list size = %d segments\n", ++ __func__, segments); ++ return segments; ++} +diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h +index a71b0f5..2ebc743 100644 +--- a/net/sunrpc/xprtrdma/xprt_rdma.h ++++ b/net/sunrpc/xprtrdma/xprt_rdma.h +@@ -65,14 +65,14 @@ + */ + struct rpcrdma_ia { + const struct rpcrdma_memreg_ops *ri_ops; ++ rwlock_t ri_qplock; + struct ib_device *ri_device; + struct rdma_cm_id *ri_id; + struct ib_pd *ri_pd; ++ struct ib_mr *ri_dma_mr; + struct completion ri_done; + int ri_async_rc; + unsigned int ri_max_frmr_depth; +- unsigned int ri_max_inline_write; +- unsigned int ri_max_inline_read; + struct ib_qp_attr ri_qp_attr; + struct ib_qp_init_attr ri_qp_init_attr; + }; +@@ -144,26 +144,6 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) + + #define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) + +-/* To ensure a transport can always make forward progress, +- * the number of RDMA segments allowed in header chunk lists +- * is capped at 8. This prevents less-capable devices and +- * memory registrations from overrunning the Send buffer +- * while building chunk lists. +- * +- * Elements of the Read list take up more room than the +- * Write list or Reply chunk. 8 read segments means the Read +- * list (or Write list or Reply chunk) cannot consume more +- * than +- * +- * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. +- * +- * And the fixed part of the header is another 24 bytes. +- * +- * The smallest inline threshold is 1024 bytes, ensuring that +- * at least 750 bytes are available for RPC messages. +- */ +-#define RPCRDMA_MAX_HDR_SEGS (8) +- + /* + * struct rpcrdma_rep -- this structure encapsulates state required to recv + * and complete a reply, asychronously. It needs several pieces of +@@ -171,14 +151,21 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) + * o recv buffer (posted to provider) + * o ib_sge (also donated to provider) + * o status of reply (length, success or not) +- * o bookkeeping state to get run by reply handler (list, etc) ++ * o bookkeeping state to get run by tasklet (list, etc) + * +- * These are allocated during initialization, per-transport instance. ++ * These are allocated during initialization, per-transport instance; ++ * however, the tasklet execution list itself is global, as it should ++ * always be pretty short. + * + * N of these are associated with a transport instance, and stored in + * struct rpcrdma_buffer. N is the max number of outstanding requests. + */ + ++#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) ++#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ ++ ++struct rpcrdma_buffer; ++ + struct rpcrdma_rep { + struct ib_cqe rr_cqe; + unsigned int rr_len; +@@ -211,10 +198,14 @@ enum rpcrdma_frmr_state { + }; + + struct rpcrdma_frmr { ++ struct scatterlist *sg; ++ int sg_nents; + struct ib_mr *fr_mr; + struct ib_cqe fr_cqe; + enum rpcrdma_frmr_state fr_state; + struct completion fr_linv_done; ++ struct work_struct fr_work; ++ struct rpcrdma_xprt *fr_xprt; + union { + struct ib_reg_wr fr_regwr; + struct ib_send_wr fr_invwr; +@@ -222,23 +213,16 @@ struct rpcrdma_frmr { + }; + + struct rpcrdma_fmr { +- struct ib_fmr *fm_mr; +- u64 *fm_physaddrs; ++ struct ib_fmr *fmr; ++ u64 *physaddrs; + }; + + struct rpcrdma_mw { +- struct list_head mw_list; +- struct scatterlist *mw_sg; +- int mw_nents; +- enum dma_data_direction mw_dir; + union { + struct rpcrdma_fmr fmr; + struct rpcrdma_frmr frmr; + }; +- struct rpcrdma_xprt *mw_xprt; +- u32 mw_handle; +- u32 mw_length; +- u64 mw_offset; ++ struct list_head mw_list; + struct list_head mw_all; + }; + +@@ -258,44 +242,44 @@ struct rpcrdma_mw { + * of iovs for send operations. The reason is that the iovs passed to + * ib_post_{send,recv} must not be modified until the work request + * completes. ++ * ++ * NOTES: ++ * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we ++ * marshal. The number needed varies depending on the iov lists that ++ * are passed to us, the memory registration mode we are in, and if ++ * physical addressing is used, the layout. + */ + +-/* Maximum number of page-sized "segments" per chunk list to be +- * registered or invalidated. Must handle a Reply chunk: +- */ +-enum { +- RPCRDMA_MAX_IOV_SEGS = 3, +- RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1, +- RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS + +- RPCRDMA_MAX_IOV_SEGS, +-}; +- + struct rpcrdma_mr_seg { /* chunk descriptors */ ++ struct rpcrdma_mw *rl_mw; /* registered MR */ ++ u64 mr_base; /* registration result */ ++ u32 mr_rkey; /* registration result */ + u32 mr_len; /* length of chunk or segment */ ++ int mr_nsegs; /* number of segments in chunk or 0 */ ++ enum dma_data_direction mr_dir; /* segment mapping direction */ ++ dma_addr_t mr_dma; /* segment mapping address */ ++ size_t mr_dmalen; /* segment mapping length */ + struct page *mr_page; /* owning page, if any */ + char *mr_offset; /* kva if no page, else offset */ + }; + + #define RPCRDMA_MAX_IOVS (2) + +-struct rpcrdma_buffer; + struct rpcrdma_req { + struct list_head rl_free; + unsigned int rl_niovs; ++ unsigned int rl_nchunks; + unsigned int rl_connect_cookie; +- struct rpc_task *rl_task; + struct rpcrdma_buffer *rl_buffer; + struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ + struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; + struct rpcrdma_regbuf *rl_rdmabuf; + struct rpcrdma_regbuf *rl_sendbuf; ++ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + + struct ib_cqe rl_cqe; + struct list_head rl_all; + bool rl_backchannel; +- +- struct list_head rl_registered; /* registered segments */ +- struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + }; + + static inline struct rpcrdma_req * +@@ -321,7 +305,6 @@ struct rpcrdma_buffer { + char *rb_pool; + + spinlock_t rb_lock; /* protect buf lists */ +- int rb_send_count, rb_recv_count; + struct list_head rb_send_bufs; + struct list_head rb_recv_bufs; + u32 rb_max_requests; +@@ -332,11 +315,6 @@ struct rpcrdma_buffer { + struct list_head rb_allreqs; + + u32 rb_bc_max_requests; +- +- spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */ +- struct list_head rb_stale_mrs; +- struct delayed_work rb_recovery_worker; +- struct delayed_work rb_refresh_worker; + }; + #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) + +@@ -383,9 +361,6 @@ struct rpcrdma_stats { + unsigned long bad_reply_count; + unsigned long nomsg_call_count; + unsigned long bcall_count; +- unsigned long mrs_recovered; +- unsigned long mrs_orphaned; +- unsigned long mrs_allocated; + }; + + /* +@@ -394,25 +369,23 @@ struct rpcrdma_stats { + struct rpcrdma_xprt; + struct rpcrdma_memreg_ops { + int (*ro_map)(struct rpcrdma_xprt *, +- struct rpcrdma_mr_seg *, int, bool, +- struct rpcrdma_mw **); ++ struct rpcrdma_mr_seg *, int, bool); + void (*ro_unmap_sync)(struct rpcrdma_xprt *, + struct rpcrdma_req *); +- void (*ro_unmap_safe)(struct rpcrdma_xprt *, +- struct rpcrdma_req *, bool); +- void (*ro_recover_mr)(struct rpcrdma_mw *); ++ int (*ro_unmap)(struct rpcrdma_xprt *, ++ struct rpcrdma_mr_seg *); + int (*ro_open)(struct rpcrdma_ia *, + struct rpcrdma_ep *, + struct rpcrdma_create_data_internal *); + size_t (*ro_maxpages)(struct rpcrdma_xprt *); +- int (*ro_init_mr)(struct rpcrdma_ia *, +- struct rpcrdma_mw *); +- void (*ro_release_mr)(struct rpcrdma_mw *); ++ int (*ro_init)(struct rpcrdma_xprt *); ++ void (*ro_destroy)(struct rpcrdma_buffer *); + const char *ro_displayname; + }; + + extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops; + extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops; ++extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops; + + /* + * RPCRDMA transport -- encapsulates the structures above for +@@ -447,8 +420,6 @@ extern int xprt_rdma_pad_optimize; + */ + int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int); + void rpcrdma_ia_close(struct rpcrdma_ia *); +-bool frwr_is_supported(struct rpcrdma_ia *); +-bool fmr_is_supported(struct rpcrdma_ia *); + + /* + * Endpoint calls - xprtrdma/verbs.c +@@ -480,15 +451,17 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); + void rpcrdma_recv_buffer_get(struct rpcrdma_req *); + void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); + +-void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); +- + struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, + size_t, gfp_t); + void rpcrdma_free_regbuf(struct rpcrdma_ia *, + struct rpcrdma_regbuf *); + ++unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); + int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); + ++int frwr_alloc_recovery_wq(void); ++void frwr_destroy_recovery_wq(void); ++ + int rpcrdma_alloc_wq(void); + void rpcrdma_destroy_wq(void); + +@@ -496,12 +469,45 @@ void rpcrdma_destroy_wq(void); + * Wrappers for chunk registration, shared by read/write chunk code. + */ + ++void rpcrdma_mapping_error(struct rpcrdma_mr_seg *); ++ + static inline enum dma_data_direction + rpcrdma_data_dir(bool writing) + { + return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE; + } + ++static inline void ++rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg, ++ enum dma_data_direction direction) ++{ ++ seg->mr_dir = direction; ++ seg->mr_dmalen = seg->mr_len; ++ ++ if (seg->mr_page) ++ seg->mr_dma = ib_dma_map_page(device, ++ seg->mr_page, offset_in_page(seg->mr_offset), ++ seg->mr_dmalen, seg->mr_dir); ++ else ++ seg->mr_dma = ib_dma_map_single(device, ++ seg->mr_offset, ++ seg->mr_dmalen, seg->mr_dir); ++ ++ if (ib_dma_mapping_error(device, seg->mr_dma)) ++ rpcrdma_mapping_error(seg); ++} ++ ++static inline void ++rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg) ++{ ++ if (seg->mr_page) ++ ib_dma_unmap_page(device, ++ seg->mr_dma, seg->mr_dmalen, seg->mr_dir); ++ else ++ ib_dma_unmap_single(device, ++ seg->mr_dma, seg->mr_dmalen, seg->mr_dir); ++} ++ + /* + * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c + */ +@@ -513,9 +519,6 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *); + * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c + */ + int rpcrdma_marshal_req(struct rpc_rqst *); +-void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *, +- struct rpcrdma_create_data_internal *, +- unsigned int); + + /* RPC/RDMA module init - xprtrdma/transport.c + */ +@@ -531,7 +534,6 @@ void xprt_rdma_cleanup(void); + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); + int xprt_rdma_bc_up(struct svc_serv *, struct net *); +-size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); + int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); + void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); + int rpcrdma_bc_marshal_reply(struct rpc_rqst *); +diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h +index d6917b8..3081339 100644 +--- a/include/linux/sunrpc/svc_rdma.h ++++ b/include/linux/sunrpc/svc_rdma.h +@@ -199,7 +199,7 @@ extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, + struct xdr_buf *rcvbuf); + + /* svc_rdma_marshal.c */ +-extern int svc_rdma_xdr_decode_req(struct xdr_buf *); ++extern int svc_rdma_xdr_decode_req(struct rpcrdma_msg *, struct svc_rqst *); + extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *, + struct rpcrdma_msg *, + enum rpcrdma_errcode, __be32 *); -- 2.46.0