From 3f1af42ad0fad8a12242233dd0d9fc42f5e83415 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 9 Feb 2016 17:50:15 +0100 Subject: libceph: enable large, variable-sized OSD requests Turn r_ops into a flexible array member to enable large, consisting of up to 16 ops, OSD requests. The use case is scattered writeback in cephfs and, as far as the kernel client is concerned, 16 is just a made up number. r_ops had size 3 for copyup+hint+write, but copyup is really a special case - it can only happen once. ceph_osd_request_cache is therefore stuffed with num_ops=2 requests, anything bigger than that is allocated with kmalloc(). req_mempool is backed by ceph_osd_request_cache, which means either num_ops=1 or num_ops=2 for use_mempool=true - all existing users (ceph_writepages_start(), ceph_osdc_writepages()) are fine with that. Signed-off-by: Ilya Dryomov --- net/ceph/osd_client.c | 43 ++++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 15 deletions(-) (limited to 'net/ceph') diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index f93d0e893f3f..ccd4e031fa3f 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -338,9 +338,10 @@ static void ceph_osdc_release_request(struct kref *kref) ceph_put_snap_context(req->r_snapc); if (req->r_mempool) mempool_free(req, req->r_osdc->req_mempool); - else + else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS) kmem_cache_free(ceph_osd_request_cache, req); - + else + kfree(req); } void ceph_osdc_get_request(struct ceph_osd_request *req) @@ -369,18 +370,22 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, struct ceph_msg *msg; size_t msg_size; - BUILD_BUG_ON(CEPH_OSD_MAX_OP > U16_MAX); - BUG_ON(num_ops > CEPH_OSD_MAX_OP); - if (use_mempool) { + BUG_ON(num_ops > CEPH_OSD_SLAB_OPS); req = mempool_alloc(osdc->req_mempool, gfp_flags); - memset(req, 0, sizeof(*req)); + } else if (num_ops <= CEPH_OSD_SLAB_OPS) { + req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags); } else { - req = kmem_cache_zalloc(ceph_osd_request_cache, gfp_flags); + BUG_ON(num_ops > CEPH_OSD_MAX_OPS); + req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]), + gfp_flags); } - if (req == NULL) + if (unlikely(!req)) return NULL; + /* req only, each op is zeroed in _osd_req_op_init() */ + memset(req, 0, sizeof(*req)); + req->r_osdc = osdc; req->r_mempool = use_mempool; req->r_num_ops = num_ops; @@ -398,12 +403,19 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, req->r_base_oloc.pool = -1; req->r_target_oloc.pool = -1; + msg_size = OSD_OPREPLY_FRONT_LEN; + if (num_ops > CEPH_OSD_SLAB_OPS) { + /* ceph_osd_op and rval */ + msg_size += (num_ops - CEPH_OSD_SLAB_OPS) * + (sizeof(struct ceph_osd_op) + 4); + } + /* create reply message */ if (use_mempool) msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0); else - msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, - OSD_OPREPLY_FRONT_LEN, gfp_flags, true); + msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, + gfp_flags, true); if (!msg) { ceph_osdc_put_request(req); return NULL; @@ -1811,7 +1823,7 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg) ceph_decode_need(&p, end, 4, bad_put); numops = ceph_decode_32(&p); - if (numops > CEPH_OSD_MAX_OP) + if (numops > CEPH_OSD_MAX_OPS) goto bad_put; if (numops != req->r_num_ops) goto bad_put; @@ -2784,11 +2796,12 @@ EXPORT_SYMBOL(ceph_osdc_writepages); int ceph_osdc_setup(void) { + size_t size = sizeof(struct ceph_osd_request) + + CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op); + BUG_ON(ceph_osd_request_cache); - ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", - sizeof (struct ceph_osd_request), - __alignof__(struct ceph_osd_request), - 0, NULL); + ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size, + 0, 0, NULL); return ceph_osd_request_cache ? 0 : -ENOMEM; } -- cgit v1.2.3