From fce7a9744bdf3baa9d08c52d2c78f53ab49918a4 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 29 Sep 2018 16:02:19 +0800 Subject: ceph: refactor ceph_sync_read() Avoid allocating memory for the entire user request: striped_read() does a synchronous OSD request per object, so it doesn't need more than object size worth of pages at a time. [ Preserve the comment, changelog. ] Signed-off-by: "Yan, Zheng" Signed-off-by: Ilya Dryomov --- fs/ceph/file.c | 219 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 106 insertions(+), 113 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 92ab20433682..846997481674 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -556,91 +556,27 @@ enum { READ_INLINE = 3, }; -/* - * Read a range of bytes striped over one or more objects. Iterate over - * objects we stripe over. (That's not atomic, but good enough for now.) - * - * If we get a short result from the OSD, check against i_size; we need to - * only return a short read to the caller if we hit EOF. - */ -static int striped_read(struct inode *inode, - u64 pos, u64 len, - struct page **pages, int num_pages, - int page_align, int *checkeof) -{ - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_inode_info *ci = ceph_inode(inode); - u64 this_len; - loff_t i_size; - int page_idx; - int ret, read = 0; - bool hit_stripe, was_short; - - /* - * we may need to do multiple reads. not atomic, unfortunately. - */ -more: - this_len = len; - page_idx = (page_align + read) >> PAGE_SHIFT; - ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), - &ci->i_layout, pos, &this_len, - ci->i_truncate_seq, ci->i_truncate_size, - pages + page_idx, num_pages - page_idx, - ((page_align + read) & ~PAGE_MASK)); - if (ret == -ENOENT) - ret = 0; - hit_stripe = this_len < len; - was_short = ret >= 0 && ret < this_len; - dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read, - ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); - - i_size = i_size_read(inode); - if (ret >= 0) { - if (was_short && (pos + ret < i_size)) { - int zlen = min(this_len - ret, i_size - pos - ret); - int zoff = page_align + read + ret; - dout(" zero gap %llu to %llu\n", - pos + ret, pos + ret + zlen); - ceph_zero_page_vector_range(zoff, zlen, pages); - ret += zlen; - } - - read += ret; - pos += ret; - len -= ret; - - /* hit stripe and need continue*/ - if (len && hit_stripe && pos < i_size) - goto more; - } - - if (read > 0) { - ret = read; - /* did we bounce off eof? */ - if (pos + len > i_size) - *checkeof = CHECK_EOF; - } - - dout("striped_read returns %d\n", ret); - return ret; -} - /* * Completely synchronous read and write methods. Direct from __user * buffer to osd, or directly to user pages (if O_DIRECT). * - * If the read spans object boundary, just do multiple reads. + * If the read spans object boundary, just do multiple reads. (That's not + * atomic, but good enough for now.) + * + * If we get a short result from the OSD, check against i_size; we need to + * only return a short read to the caller if we hit EOF. */ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, - int *checkeof) + int *retry_op) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); - struct page **pages; - u64 off = iocb->ki_pos; - int num_pages; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_client(inode); + struct ceph_osd_client *osdc = &fsc->client->osdc; ssize_t ret; - size_t len = iov_iter_count(to); + u64 off = iocb->ki_pos; + u64 len = iov_iter_count(to); dout("sync_read on file %p %llu~%u %s\n", file, off, (unsigned)len, (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); @@ -653,61 +589,118 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to, * but it will at least behave sensibly when they are * in sequence. */ - ret = filemap_write_and_wait_range(inode->i_mapping, off, - off + len); + ret = filemap_write_and_wait_range(inode->i_mapping, off, off + len); if (ret < 0) return ret; - if (unlikely(to->type & ITER_PIPE)) { + ret = 0; + while ((len = iov_iter_count(to)) > 0) { + struct ceph_osd_request *req; + struct page **pages; + int num_pages; size_t page_off; - ret = iov_iter_get_pages_alloc(to, &pages, len, - &page_off); - if (ret <= 0) - return -ENOMEM; - num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + u64 i_size; + bool more; + + req = ceph_osdc_new_request(osdc, &ci->i_layout, + ci->i_vino, off, &len, 0, 1, + CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ, + NULL, ci->i_truncate_seq, + ci->i_truncate_size, false); + if (IS_ERR(req)) { + ret = PTR_ERR(req); + break; + } + + more = len < iov_iter_count(to); - ret = striped_read(inode, off, ret, pages, num_pages, - page_off, checkeof); - if (ret > 0) { - iov_iter_advance(to, ret); - off += ret; + if (unlikely(to->type & ITER_PIPE)) { + ret = iov_iter_get_pages_alloc(to, &pages, len, + &page_off); + if (ret <= 0) { + ceph_osdc_put_request(req); + ret = -ENOMEM; + break; + } + num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE); + if (ret < len) { + len = ret; + osd_req_op_extent_update(req, 0, len); + more = false; + } } else { - iov_iter_advance(to, 0); + num_pages = calc_pages_for(off, len); + page_off = off & ~PAGE_MASK; + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) { + ceph_osdc_put_request(req); + ret = PTR_ERR(pages); + break; + } } - ceph_put_page_vector(pages, num_pages, false); - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = striped_read(inode, off, len, pages, num_pages, - (off & ~PAGE_MASK), checkeof); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, left, - PAGE_SIZE - page_off); - l = copy_page_to_iter(pages[k++], page_off, - copy, to); - off += l; - left -= l; - if (l < copy) + + osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_off, + false, false); + ret = ceph_osdc_start_request(osdc, req, false); + if (!ret) + ret = ceph_osdc_wait_request(osdc, req); + ceph_osdc_put_request(req); + + i_size = i_size_read(inode); + dout("sync_read %llu~%llu got %zd i_size %llu%s\n", + off, len, ret, i_size, (more ? " MORE" : "")); + + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && (off + ret < i_size)) { + int zlen = min(len - ret, i_size - off - ret); + int zoff = page_off + ret; + dout("sync_read zero gap %llu~%llu\n", + off + ret, off + ret + zlen); + ceph_zero_page_vector_range(zoff, zlen, pages); + ret += zlen; + } + + if (unlikely(to->type & ITER_PIPE)) { + if (ret > 0) { + iov_iter_advance(to, ret); + off += ret; + } else { + iov_iter_advance(to, 0); + } + ceph_put_page_vector(pages, num_pages, false); + } else { + int idx = 0; + size_t left = ret > 0 ? ret : 0; + while (left > 0) { + size_t len, copied; + page_off = off & ~PAGE_MASK; + len = min_t(size_t, left, PAGE_SIZE - page_off); + copied = copy_page_to_iter(pages[idx++], + page_off, len, to); + off += copied; + left -= copied; + if (copied < len) { + ret = -EFAULT; break; + } } + ceph_release_page_vector(pages, num_pages); } - ceph_release_page_vector(pages, num_pages); + + if (ret <= 0 || off >= i_size || !more) + break; } if (off > iocb->ki_pos) { + if (ret >= 0 && + iov_iter_count(to) > 0 && off >= i_size_read(inode)) + *retry_op = CHECK_EOF; ret = off - iocb->ki_pos; iocb->ki_pos = off; } - dout("sync_read result %zd\n", ret); + dout("sync_read result %zd retry_op %d\n", ret, *retry_op); return ret; } -- cgit v1.2.3