From 66d7a56a6254389587d0999dcaab1d2634cd4e24 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 11:19:20 -0400 Subject: SUNRPC: Refactor TCP receive Move the TCP data receive loop out of xs_tcp_data_ready(). Doing so will allow us to move the data receive out of the softirq context in a set of followup patches. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 44 +++++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1a85e0ed0b48..fa8d0c15c8cd 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1391,6 +1391,30 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns return len - desc.count; } +static void xs_tcp_data_receive(struct sock_xprt *transport) +{ + struct rpc_xprt *xprt = &transport->xprt; + struct sock *sk; + read_descriptor_t rd_desc = { + .count = 2*1024*1024, + .arg.data = xprt, + }; + unsigned long total = 0; + int read = 0; + + sk = transport->inet; + + /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ + for (;;) { + read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + if (read <= 0) + break; + total += read; + rd_desc.count = 65536; + } + trace_xs_tcp_data_ready(xprt, read, total); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1398,34 +1422,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns */ static void xs_tcp_data_ready(struct sock *sk) { + struct sock_xprt *transport; struct rpc_xprt *xprt; - read_descriptor_t rd_desc; - int read; - unsigned long total = 0; dprintk("RPC: xs_tcp_data_ready...\n"); read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) { - read = 0; + if (!(xprt = xprt_from_sock(sk))) goto out; - } + transport = container_of(xprt, struct sock_xprt, xprt); + /* Any data means we had a useful conversation, so * the we don't need to delay the next reconnect */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; - /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ - rd_desc.arg.data = xprt; - do { - rd_desc.count = 65536; - read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - if (read > 0) - total += read; - } while (read > 0); + xs_tcp_data_receive(transport); out: - trace_xs_tcp_data_ready(xprt, read, total); read_unlock_bh(&sk->sk_callback_lock); } -- cgit v1.2.3 From edc1b01cd3b20a5fff049e98f82a2b0d24a34c89 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Mon, 5 Oct 2015 10:53:49 -0400 Subject: SUNRPC: Move TCP receive data path into a workqueue context Stream protocols such as TCP can often build up a backlog of data to be read due to ordering. Combine this with the fact that some workloads such as NFS read()-intensive workloads need to receive a lot of data per RPC call, and it turns out that receiving the data from inside a softirq context can cause starvation. The following patch moves the TCP data receive into a workqueue context. We still end up calling tcp_read_sock(), but we do so from a process context, meaning that softirqs are enabled for most of the time. With this patch, I see a doubling of read bandwidth when running a multi-threaded iozone workload between a virtual client and server setup. Signed-off-by: Trond Myklebust --- include/linux/sunrpc/xprtsock.h | 2 ++ net/sunrpc/xprtsock.c | 51 +++++++++++++++++++++++++++++------------ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h index 357e44c1a46b..0ece4ba06f06 100644 --- a/include/linux/sunrpc/xprtsock.h +++ b/include/linux/sunrpc/xprtsock.h @@ -44,6 +44,8 @@ struct sock_xprt { */ unsigned long sock_state; struct delayed_work connect_worker; + struct work_struct recv_worker; + struct mutex recv_mutex; struct sockaddr_storage srcaddr; unsigned short srcport; diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index fa8d0c15c8cd..58dc90ccebb6 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -823,6 +823,7 @@ static void xs_reset_transport(struct sock_xprt *transport) kernel_sock_shutdown(sock, SHUT_RDWR); + mutex_lock(&transport->recv_mutex); write_lock_bh(&sk->sk_callback_lock); transport->inet = NULL; transport->sock = NULL; @@ -833,6 +834,7 @@ static void xs_reset_transport(struct sock_xprt *transport) xprt_clear_connected(xprt); write_unlock_bh(&sk->sk_callback_lock); xs_sock_reset_connection_flags(xprt); + mutex_unlock(&transport->recv_mutex); trace_rpc_socket_close(xprt, sock); sock_release(sock); @@ -886,6 +888,7 @@ static void xs_destroy(struct rpc_xprt *xprt) cancel_delayed_work_sync(&transport->connect_worker); xs_close(xprt); + cancel_work_sync(&transport->recv_worker); xs_xprt_free(xprt); module_put(THIS_MODULE); } @@ -1243,12 +1246,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid)); /* Find and lock the request corresponding to this xid */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_rqst(xprt, transport->tcp_xid); if (!req) { dprintk("RPC: XID %08x request not found!\n", ntohl(transport->tcp_xid)); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return -1; } @@ -1257,7 +1260,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_rqst(req->rq_task, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1277,10 +1280,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, struct rpc_rqst *req; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); req = xprt_lookup_bc_request(xprt, transport->tcp_xid); if (req == NULL) { - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); printk(KERN_WARNING "Callback slot table overflowed\n"); xprt_force_disconnect(xprt); return -1; @@ -1291,7 +1294,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt, if (!(transport->tcp_flags & TCP_RCV_COPY_DATA)) xprt_complete_bc_request(req, transport->tcp_copied); - spin_unlock(&xprt->transport_lock); + spin_unlock_bh(&xprt->transport_lock); return 0; } @@ -1402,19 +1405,33 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) unsigned long total = 0; int read = 0; + mutex_lock(&transport->recv_mutex); sk = transport->inet; + if (sk == NULL) + goto out; /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */ for (;;) { + lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); + release_sock(sk); if (read <= 0) break; total += read; rd_desc.count = 65536; } +out: + mutex_unlock(&transport->recv_mutex); trace_xs_tcp_data_ready(xprt, read, total); } +static void xs_tcp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_tcp_data_receive(transport); +} + /** * xs_tcp_data_ready - "data ready" callback for TCP sockets * @sk: socket with data to read @@ -1437,8 +1454,8 @@ static void xs_tcp_data_ready(struct sock *sk) */ if (xprt->reestablish_timeout) xprt->reestablish_timeout = 0; + queue_work(rpciod_workqueue, &transport->recv_worker); - xs_tcp_data_receive(transport); out: read_unlock_bh(&sk->sk_callback_lock); } @@ -1840,6 +1857,10 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif +static void xs_dummy_data_receive_workfn(struct work_struct *work) +{ +} + static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -2664,6 +2685,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, } new = container_of(xprt, struct sock_xprt, xprt); + mutex_init(&new->recv_mutex); memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) @@ -2717,6 +2739,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); @@ -2788,21 +2811,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_udp_setup_socket); xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6); break; default: @@ -2867,21 +2889,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); + INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); + switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); - INIT_DELAYED_WORK(&transport->connect_worker, - xs_tcp_setup_socket); xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6); break; default: -- cgit v1.2.3 From f9b2ee714c5c945cda27e9cbca5f60d5199fb93f Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 16:26:05 -0400 Subject: SUNRPC: Move UDP receive data path into a workqueue context Now that we've done it for TCP, let's convert UDP as well. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 84 +++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 58dc90ccebb6..df8bdcc10640 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -972,42 +972,36 @@ static void xs_local_data_ready(struct sock *sk) } /** - * xs_udp_data_ready - "data ready" callback for UDP sockets - * @sk: socket with data to read + * xs_udp_data_read_skb - receive callback for UDP sockets + * @xprt: transport + * @sk: socket + * @skb: skbuff * */ -static void xs_udp_data_ready(struct sock *sk) +static void xs_udp_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: xs_udp_data_ready...\n"); - if (!(xprt = xprt_from_sock(sk))) - goto out; - - if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL) - goto out; - repsize = skb->len - sizeof(struct udphdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(struct udphdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -1028,10 +1022,54 @@ static void xs_udp_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_udp_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_udp_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_udp_data_receive(transport); +} + +/** + * xs_data_ready - "data ready" callback for UDP sockets + * @sk: socket with data to read + * + */ +static void xs_data_ready(struct sock *sk) +{ + struct rpc_xprt *xprt; + + read_lock_bh(&sk->sk_callback_lock); + dprintk("RPC: xs_data_ready...\n"); + xprt = xprt_from_sock(sk); + if (xprt != NULL) { + struct sock_xprt *transport = container_of(xprt, + struct sock_xprt, xprt); + queue_work(rpciod_workqueue, &transport->recv_worker); + } read_unlock_bh(&sk->sk_callback_lock); } @@ -2094,7 +2132,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_udp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_allocation = GFP_NOIO; @@ -2811,7 +2849,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) xprt->timeout = &xs_udp_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); switch (addr->sa_family) { -- cgit v1.2.3 From a26480942c99d84a7682ea4c00f47a3a42ed41d2 Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 17:03:00 -0400 Subject: SUNRPC: Move AF_LOCAL receive data path into a workqueue context Now that we've done it for TCP and UDP, let's convert AF_LOCAL as well. Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 72 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index df8bdcc10640..1471ecceabf9 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -909,44 +909,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) } /** - * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets - * @sk: socket with data to read + * xs_local_data_read_skb + * @xprt: transport + * @sk: socket + * @skb: skbuff * * Currently this assumes we can read the whole reply in a single gulp. */ -static void xs_local_data_ready(struct sock *sk) +static void xs_local_data_read_skb(struct rpc_xprt *xprt, + struct sock *sk, + struct sk_buff *skb) { struct rpc_task *task; - struct rpc_xprt *xprt; struct rpc_rqst *rovr; - struct sk_buff *skb; - int err, repsize, copied; + int repsize, copied; u32 _xid; __be32 *xp; - read_lock_bh(&sk->sk_callback_lock); - dprintk("RPC: %s...\n", __func__); - xprt = xprt_from_sock(sk); - if (xprt == NULL) - goto out; - - skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) - goto out; - repsize = skb->len - sizeof(rpc_fraghdr); if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d\n", repsize); - goto dropit; + return; } /* Copy the XID from the skb... */ xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid); if (xp == NULL) - goto dropit; + return; /* Look up and lock the request corresponding to the given XID */ - spin_lock(&xprt->transport_lock); + spin_lock_bh(&xprt->transport_lock); rovr = xprt_lookup_rqst(xprt, *xp); if (!rovr) goto out_unlock; @@ -964,11 +956,35 @@ static void xs_local_data_ready(struct sock *sk) xprt_complete_rqst(task, copied); out_unlock: - spin_unlock(&xprt->transport_lock); - dropit: - skb_free_datagram(sk, skb); - out: - read_unlock_bh(&sk->sk_callback_lock); + spin_unlock_bh(&xprt->transport_lock); +} + +static void xs_local_data_receive(struct sock_xprt *transport) +{ + struct sk_buff *skb; + struct sock *sk; + int err; + + mutex_lock(&transport->recv_mutex); + sk = transport->inet; + if (sk == NULL) + goto out; + for (;;) { + skb = skb_recv_datagram(sk, 0, 1, &err); + if (skb == NULL) + break; + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + } +out: + mutex_unlock(&transport->recv_mutex); +} + +static void xs_local_data_receive_workfn(struct work_struct *work) +{ + struct sock_xprt *transport = + container_of(work, struct sock_xprt, recv_worker); + xs_local_data_receive(transport); } /** @@ -1895,10 +1911,6 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } #endif -static void xs_dummy_data_receive_workfn(struct work_struct *work) -{ -} - static void xs_dummy_setup_socket(struct work_struct *work) { } @@ -1946,7 +1958,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_local_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2777,7 +2789,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) xprt->ops = &xs_local_ops; xprt->timeout = &xs_local_default_timeout; - INIT_WORK(&transport->recv_worker, xs_dummy_data_receive_workfn); + INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); -- cgit v1.2.3 From 31303d6cbb24ba94e8b82170213bd2fde6365d9a Mon Sep 17 00:00:00 2001 From: Trond Myklebust Date: Tue, 6 Oct 2015 15:59:20 -0400 Subject: SUNRPC: Use MSG_SENDPAGE_NOTLAST in xs_send_pagedata() If we're sending more than one page via kernel_sendpage(), then set MSG_SENDPAGE_NOTLAST between the pages so that we don't send suboptimal frames (see commit 2f5338442425 and commit 35f9c09fe9c7). Signed-off-by: Trond Myklebust --- net/sunrpc/xprtsock.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 1471ecceabf9..e71aff251ac1 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i int flags = XS_SENDMSG_FLAGS; remainder -= len; - if (remainder != 0 || more) + if (more) flags |= MSG_MORE; + if (remainder != 0) + flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE; err = do_sendpage(sock, *ppage, base, len, flags); if (remainder == 0 || err != len) break; -- cgit v1.2.3