diff options
Diffstat (limited to 'net/rds')
-rw-r--r-- | net/rds/af_rds.c | 14 | ||||
-rw-r--r-- | net/rds/message.c | 126 | ||||
-rw-r--r-- | net/rds/rds.h | 19 | ||||
-rw-r--r-- | net/rds/recv.c | 33 | ||||
-rw-r--r-- | net/rds/send.c | 54 | ||||
-rw-r--r-- | net/rds/tcp.c | 7 |
6 files changed, 225 insertions, 28 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c index 744c637c86b0..f7126108a811 100644 --- a/net/rds/af_rds.c +++ b/net/rds/af_rds.c @@ -77,6 +77,7 @@ static int rds_release(struct socket *sock) rds_send_drop_to(rs, NULL); rds_rdma_drop_keys(rs); rds_notify_queue_get(rs, NULL); + __skb_queue_purge(&rs->rs_zcookie_queue); spin_lock_bh(&rds_sock_lock); list_del_init(&rs->rs_item); @@ -110,7 +111,7 @@ void rds_wake_sk_sleep(struct rds_sock *rs) } static int rds_getname(struct socket *sock, struct sockaddr *uaddr, - int *uaddr_len, int peer) + int peer) { struct sockaddr_in *sin = (struct sockaddr_in *)uaddr; struct rds_sock *rs = rds_sk_to_rs(sock->sk); @@ -131,8 +132,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr, sin->sin_family = AF_INET; - *uaddr_len = sizeof(*sin); - return 0; + return sizeof(*sin); } /* @@ -145,7 +145,7 @@ static int rds_getname(struct socket *sock, struct sockaddr *uaddr, * - to signal that a previously congested destination may have become * uncongested * - A notification has been queued to the socket (this can be a congestion - * update, or a RDMA completion). + * update, or a RDMA completion, or a MSG_ZEROCOPY completion). * * EPOLLOUT is asserted if there is room on the send queue. This does not mean * however, that the next sendmsg() call will succeed. If the application tries @@ -179,10 +179,13 @@ static __poll_t rds_poll(struct file *file, struct socket *sock, spin_unlock(&rs->rs_lock); } if (!list_empty(&rs->rs_recv_queue) || - !list_empty(&rs->rs_notify_queue)) + !list_empty(&rs->rs_notify_queue) || + !skb_queue_empty(&rs->rs_zcookie_queue)) mask |= (EPOLLIN | EPOLLRDNORM); if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) mask |= (EPOLLOUT | EPOLLWRNORM); + if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue)) + mask |= POLLERR; read_unlock_irqrestore(&rs->rs_recv_lock, flags); /* clear state any time we wake a seen-congested socket */ @@ -512,6 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol) INIT_LIST_HEAD(&rs->rs_recv_queue); INIT_LIST_HEAD(&rs->rs_notify_queue); INIT_LIST_HEAD(&rs->rs_cong_list); + skb_queue_head_init(&rs->rs_zcookie_queue); spin_lock_init(&rs->rs_rdma_lock); rs->rs_rdma_keys = RB_ROOT; rs->rs_rx_traces = 0; diff --git a/net/rds/message.c b/net/rds/message.c index 4318cc9b78f7..116cf87ccb89 100644 --- a/net/rds/message.c +++ b/net/rds/message.c @@ -33,6 +33,9 @@ #include <linux/kernel.h> #include <linux/slab.h> #include <linux/export.h> +#include <linux/skbuff.h> +#include <linux/list.h> +#include <linux/errqueue.h> #include "rds.h" @@ -53,20 +56,84 @@ void rds_message_addref(struct rds_message *rm) } EXPORT_SYMBOL_GPL(rds_message_addref); +static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie) +{ + struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb; + int ncookies = ck->num; + + if (ncookies == RDS_MAX_ZCOOKIES) + return false; + ck->cookies[ncookies] = cookie; + ck->num = ++ncookies; + return true; +} + +static void rds_rm_zerocopy_callback(struct rds_sock *rs, + struct rds_znotifier *znotif) +{ + struct sk_buff *skb, *tail; + unsigned long flags; + struct sk_buff_head *q; + u32 cookie = znotif->z_cookie; + struct rds_zcopy_cookies *ck; + + q = &rs->rs_zcookie_queue; + spin_lock_irqsave(&q->lock, flags); + tail = skb_peek_tail(q); + + if (tail && skb_zcookie_add(tail, cookie)) { + spin_unlock_irqrestore(&q->lock, flags); + mm_unaccount_pinned_pages(&znotif->z_mmp); + consume_skb(rds_skb_from_znotifier(znotif)); + /* caller invokes rds_wake_sk_sleep() */ + return; + } + + skb = rds_skb_from_znotifier(znotif); + ck = (struct rds_zcopy_cookies *)skb->cb; + memset(ck, 0, sizeof(*ck)); + WARN_ON(!skb_zcookie_add(skb, cookie)); + + __skb_queue_tail(q, skb); + + spin_unlock_irqrestore(&q->lock, flags); + /* caller invokes rds_wake_sk_sleep() */ + + mm_unaccount_pinned_pages(&znotif->z_mmp); +} + /* * This relies on dma_map_sg() not touching sg[].page during merging. */ static void rds_message_purge(struct rds_message *rm) { - unsigned long i; + unsigned long i, flags; + bool zcopy = false; if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) return; + spin_lock_irqsave(&rm->m_rs_lock, flags); + if (rm->m_rs) { + struct rds_sock *rs = rm->m_rs; + + if (rm->data.op_mmp_znotifier) { + zcopy = true; + rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier); + rds_wake_sk_sleep(rs); + rm->data.op_mmp_znotifier = NULL; + } + sock_put(rds_rs_to_sk(rs)); + rm->m_rs = NULL; + } + spin_unlock_irqrestore(&rm->m_rs_lock, flags); + for (i = 0; i < rm->data.op_nents; i++) { - rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i])); /* XXX will have to put_page for page refs */ - __free_page(sg_page(&rm->data.op_sg[i])); + if (!zcopy) + __free_page(sg_page(&rm->data.op_sg[i])); + else + put_page(sg_page(&rm->data.op_sg[i])); } rm->data.op_nents = 0; @@ -266,12 +333,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in return rm; } -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + bool zcopy) { unsigned long to_copy, nbytes; unsigned long sg_off; struct scatterlist *sg; int ret = 0; + int length = iov_iter_count(from); rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); @@ -281,6 +350,55 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) sg = rm->data.op_sg; sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ + if (zcopy) { + int total_copied = 0; + struct sk_buff *skb; + + skb = alloc_skb(0, GFP_KERNEL); + if (!skb) + return -ENOMEM; + BUILD_BUG_ON(sizeof(skb->cb) < + max_t(int, sizeof(struct rds_znotifier), + sizeof(struct rds_zcopy_cookies))); + rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb); + if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp, + length)) { + ret = -ENOMEM; + goto err; + } + while (iov_iter_count(from)) { + struct page *pages; + size_t start; + ssize_t copied; + + copied = iov_iter_get_pages(from, &pages, PAGE_SIZE, + 1, &start); + if (copied < 0) { + struct mmpin *mmp; + int i; + + for (i = 0; i < rm->data.op_nents; i++) + put_page(sg_page(&rm->data.op_sg[i])); + mmp = &rm->data.op_mmp_znotifier->z_mmp; + mm_unaccount_pinned_pages(mmp); + ret = -EFAULT; + goto err; + } + total_copied += copied; + iov_iter_advance(from, copied); + length -= copied; + sg_set_page(sg, pages, copied, start); + rm->data.op_nents++; + sg++; + } + WARN_ON_ONCE(length != 0); + return ret; +err: + consume_skb(skb); + rm->data.op_mmp_znotifier = NULL; + return ret; + } /* zcopy */ + while (iov_iter_count(from)) { if (!sg_page(sg)) { ret = rds_page_remainder_alloc(sg, iov_iter_count(from), diff --git a/net/rds/rds.h b/net/rds/rds.h index 7301b9b01890..33b16353d8f3 100644 --- a/net/rds/rds.h +++ b/net/rds/rds.h @@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie) #define RDS_MSG_PAGEVEC 7 #define RDS_MSG_FLUSH 8 +struct rds_znotifier { + struct list_head z_list; + struct mmpin z_mmp; + u32 z_cookie; +}; + +#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0])) + +static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z) +{ + return container_of((void *)z, struct sk_buff, cb); +} + struct rds_message { refcount_t m_refcount; struct list_head m_sock_item; @@ -436,6 +449,7 @@ struct rds_message { unsigned int op_count; unsigned int op_dmasg; unsigned int op_dmaoff; + struct rds_znotifier *op_mmp_znotifier; struct scatterlist *op_sg; } data; }; @@ -589,6 +603,8 @@ struct rds_sock { /* Socket receive path trace points*/ u8 rs_rx_traces; u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX]; + + struct sk_buff_head rs_zcookie_queue; }; static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk) @@ -771,7 +787,8 @@ rds_conn_connecting(struct rds_connection *conn) /* message.c */ struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp); struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents); -int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from); +int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from, + bool zcopy); struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len); void rds_message_populate_header(struct rds_header *hdr, __be16 sport, __be16 dport, u64 seq); diff --git a/net/rds/recv.c b/net/rds/recv.c index b25bcfe411ca..d50747725221 100644 --- a/net/rds/recv.c +++ b/net/rds/recv.c @@ -577,6 +577,32 @@ out: return ret; } +static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg) +{ + struct sk_buff *skb; + struct sk_buff_head *q = &rs->rs_zcookie_queue; + struct rds_zcopy_cookies *done; + + if (!msg->msg_control) + return false; + + if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || + msg->msg_controllen < CMSG_SPACE(sizeof(*done))) + return false; + + skb = skb_dequeue(q); + if (!skb) + return false; + done = (struct rds_zcopy_cookies *)skb->cb; + if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done), + done)) { + skb_queue_head(q, skb); + return false; + } + consume_skb(skb); + return true; +} + int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, int msg_flags) { @@ -594,6 +620,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, if (msg_flags & MSG_OOB) goto out; + if (msg_flags & MSG_ERRQUEUE) + return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR); while (1) { /* If there are pending notifications, do those - and nothing else */ @@ -609,7 +637,9 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, if (!rds_next_incoming(rs, &inc)) { if (nonblock) { - ret = -EAGAIN; + bool reaped = rds_recvmsg_zcookie(rs, msg); + + ret = reaped ? 0 : -EAGAIN; break; } @@ -658,6 +688,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size, ret = -EFAULT; goto out; } + rds_recvmsg_zcookie(rs, msg); rds_stats_inc(s_recv_delivered); diff --git a/net/rds/send.c b/net/rds/send.c index b1b0022b8370..acad04243b41 100644 --- a/net/rds/send.c +++ b/net/rds/send.c @@ -649,7 +649,6 @@ static void rds_send_remove_from_sock(struct list_head *messages, int status) rm->rdma.op_notifier = NULL; } was_on_sock = 1; - rm->m_rs = NULL; } spin_unlock(&rs->rs_lock); @@ -756,9 +755,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) */ if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { spin_unlock_irqrestore(&cp->cp_lock, flags); - spin_lock_irqsave(&rm->m_rs_lock, flags); - rm->m_rs = NULL; - spin_unlock_irqrestore(&rm->m_rs_lock, flags); continue; } list_del_init(&rm->m_conn_item); @@ -774,7 +770,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); - rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); rds_message_put(rm); @@ -798,7 +793,6 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest) __rds_send_complete(rs, rm, RDS_RDMA_CANCELED); spin_unlock(&rs->rs_lock); - rm->m_rs = NULL; spin_unlock_irqrestore(&rm->m_rs_lock, flags); rds_message_put(rm); @@ -849,6 +843,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn, list_add_tail(&rm->m_sock_item, &rs->rs_send_queue); set_bit(RDS_MSG_ON_SOCK, &rm->m_flags); rds_message_addref(rm); + sock_hold(rds_rs_to_sk(rs)); rm->m_rs = rs; /* The code ordering is a little weird, but we're @@ -880,12 +875,13 @@ out: * rds_message is getting to be quite complicated, and we'd like to allocate * it all in one go. This figures out how big it needs to be up front. */ -static int rds_rm_size(struct msghdr *msg, int data_len) +static int rds_rm_size(struct msghdr *msg, int num_sgs) { struct cmsghdr *cmsg; int size = 0; int cmsg_groups = 0; int retval; + bool zcopy_cookie = false; for_each_cmsghdr(cmsg, msg) { if (!CMSG_OK(msg, cmsg)) @@ -904,6 +900,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len) break; + case RDS_CMSG_ZCOPY_COOKIE: + zcopy_cookie = true; + /* fall through */ + case RDS_CMSG_RDMA_DEST: case RDS_CMSG_RDMA_MAP: cmsg_groups |= 2; @@ -924,7 +924,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len) } - size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); + if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie) + return -EINVAL; + + size += num_sgs * sizeof(struct scatterlist); /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ if (cmsg_groups == 3) @@ -933,6 +936,19 @@ static int rds_rm_size(struct msghdr *msg, int data_len) return size; } +static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm, + struct cmsghdr *cmsg) +{ + u32 *cookie; + + if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)) || + !rm->data.op_mmp_znotifier) + return -EINVAL; + cookie = CMSG_DATA(cmsg); + rm->data.op_mmp_znotifier->z_cookie = *cookie; + return 0; +} + static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, struct msghdr *msg, int *allocated_mr) { @@ -975,6 +991,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, ret = rds_cmsg_atomic(rs, rm, cmsg); break; + case RDS_CMSG_ZCOPY_COOKIE: + ret = rds_cmsg_zcopy(rs, rm, cmsg); + break; + default: return -EINVAL; } @@ -1045,10 +1065,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) long timeo = sock_sndtimeo(sk, nonblock); struct rds_conn_path *cpath; size_t total_payload_len = payload_len, rdma_payload_len = 0; + bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) && + sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY)); + int num_sgs = ceil(payload_len, PAGE_SIZE); /* Mirror Linux UDP mirror of BSD error message compatibility */ /* XXX: Perhaps MSG_MORE someday */ - if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { + if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) { ret = -EOPNOTSUPP; goto out; } @@ -1092,8 +1115,15 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) goto out; } + if (zcopy) { + if (rs->rs_transport->t_type != RDS_TRANS_TCP) { + ret = -EOPNOTSUPP; + goto out; + } + num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX); + } /* size of rm including all sgs */ - ret = rds_rm_size(msg, payload_len); + ret = rds_rm_size(msg, num_sgs); if (ret < 0) goto out; @@ -1105,12 +1135,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len) /* Attach data to the rm */ if (payload_len) { - rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); + rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs); if (!rm->data.op_sg) { ret = -ENOMEM; goto out; } - ret = rds_message_copy_from_user(rm, &msg->msg_iter); + ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy); if (ret) goto out; } diff --git a/net/rds/tcp.c b/net/rds/tcp.c index 44c4652721af..08230a145042 100644 --- a/net/rds/tcp.c +++ b/net/rds/tcp.c @@ -227,7 +227,6 @@ static void rds_tcp_tc_info(struct socket *rds_sock, unsigned int len, struct rds_tcp_connection *tc; unsigned long flags; struct sockaddr_in sin; - int sinlen; struct socket *sock; spin_lock_irqsave(&rds_tcp_tc_list_lock, flags); @@ -239,12 +238,10 @@ static void rds_tcp_tc_info(struct socket *rds_sock, unsigned int len, sock = tc->t_sock; if (sock) { - sock->ops->getname(sock, (struct sockaddr *)&sin, - &sinlen, 0); + sock->ops->getname(sock, (struct sockaddr *)&sin, 0); tsinfo.local_addr = sin.sin_addr.s_addr; tsinfo.local_port = sin.sin_port; - sock->ops->getname(sock, (struct sockaddr *)&sin, - &sinlen, 1); + sock->ops->getname(sock, (struct sockaddr *)&sin, 1); tsinfo.peer_addr = sin.sin_addr.s_addr; tsinfo.peer_port = sin.sin_port; } |