diff options
Diffstat (limited to 'fs/dlm/lowcomms.c')
-rw-r--r-- | fs/dlm/lowcomms.c | 179 |
1 files changed, 85 insertions, 94 deletions
diff --git a/fs/dlm/lowcomms.c b/fs/dlm/lowcomms.c index 96f84541867c..b7b7360be609 100644 --- a/fs/dlm/lowcomms.c +++ b/fs/dlm/lowcomms.c @@ -65,40 +65,6 @@ #define MAX_SEND_MSG_COUNT 25 #define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000) -struct cbuf { - unsigned int base; - unsigned int len; - unsigned int mask; -}; - -static void cbuf_add(struct cbuf *cb, int n) -{ - cb->len += n; -} - -static int cbuf_data(struct cbuf *cb) -{ - return ((cb->base + cb->len) & cb->mask); -} - -static void cbuf_init(struct cbuf *cb, int size) -{ - cb->base = cb->len = 0; - cb->mask = size-1; -} - -static void cbuf_eat(struct cbuf *cb, int n) -{ - cb->len -= n; - cb->base += n; - cb->base &= cb->mask; -} - -static bool cbuf_empty(struct cbuf *cb) -{ - return cb->len == 0; -} - struct connection { struct socket *sock; /* NULL if not connected */ uint32_t nodeid; /* So we know who we are in the list */ @@ -117,8 +83,6 @@ struct connection { int (*rx_action) (struct connection *); /* What to do when active */ void (*connect_action) (struct connection *); /* What to do to connect */ void (*shutdown_action)(struct connection *con); /* What to do to shutdown */ - struct page *rx_page; - struct cbuf cb; int retries; #define MAX_CONNECT_RETRIES 3 struct hlist_node list; @@ -126,6 +90,9 @@ struct connection { struct work_struct rwork; /* Receive workqueue */ struct work_struct swork; /* Send workqueue */ wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */ + unsigned char *rx_buf; + int rx_buflen; + int rx_leftover; struct rcu_head rcu; }; #define sock2con(x) ((struct connection *)(x)->sk_user_data) @@ -219,6 +186,13 @@ static struct connection *nodeid2con(int nodeid, gfp_t alloc) if (!con) return NULL; + con->rx_buflen = dlm_config.ci_buffer_size; + con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); + if (!con->rx_buf) { + kfree(con); + return NULL; + } + con->nodeid = nodeid; mutex_init(&con->sock_mutex); INIT_LIST_HEAD(&con->writequeue); @@ -613,11 +587,8 @@ static void close_connection(struct connection *con, bool and_other, /* Will only re-enter once. */ close_connection(con->othercon, false, true, true); } - if (con->rx_page) { - __free_page(con->rx_page); - con->rx_page = NULL; - } + con->rx_leftover = 0; con->retries = 0; mutex_unlock(&con->sock_mutex); clear_bit(CF_CLOSING, &con->flags); @@ -671,16 +642,33 @@ static void dlm_tcp_shutdown(struct connection *con) shutdown_connection(con); } +static int con_realloc_receive_buf(struct connection *con, int newlen) +{ + unsigned char *newbuf; + + newbuf = kmalloc(newlen, GFP_NOFS); + if (!newbuf) + return -ENOMEM; + + /* copy any leftover from last receive */ + if (con->rx_leftover) + memmove(newbuf, con->rx_buf, con->rx_leftover); + + /* swap to new buffer space */ + kfree(con->rx_buf); + con->rx_buflen = newlen; + con->rx_buf = newbuf; + + return 0; +} + /* Data received from remote end */ static int receive_from_sock(struct connection *con) { - int ret = 0; - struct msghdr msg = {}; - struct kvec iov[2]; - unsigned len; - int r; int call_again_soon = 0; - int nvec; + struct msghdr msg; + struct kvec iov; + int ret, buflen; mutex_lock(&con->sock_mutex); @@ -688,71 +676,55 @@ static int receive_from_sock(struct connection *con) ret = -EAGAIN; goto out_close; } + if (con->nodeid == 0) { ret = -EINVAL; goto out_close; } - if (con->rx_page == NULL) { - /* - * This doesn't need to be atomic, but I think it should - * improve performance if it is. - */ - con->rx_page = alloc_page(GFP_ATOMIC); - if (con->rx_page == NULL) + /* realloc if we get new buffer size to read out */ + buflen = dlm_config.ci_buffer_size; + if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { + ret = con_realloc_receive_buf(con, buflen); + if (ret < 0) goto out_resched; - cbuf_init(&con->cb, PAGE_SIZE); } - /* - * iov[0] is the bit of the circular buffer between the current end - * point (cb.base + cb.len) and the end of the buffer. + /* calculate new buffer parameter regarding last receive and + * possible leftover bytes */ - iov[0].iov_len = con->cb.base - cbuf_data(&con->cb); - iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb); - iov[1].iov_len = 0; - nvec = 1; + iov.iov_base = con->rx_buf + con->rx_leftover; + iov.iov_len = con->rx_buflen - con->rx_leftover; - /* - * iov[1] is the bit of the circular buffer between the start of the - * buffer and the start of the currently used section (cb.base) - */ - if (cbuf_data(&con->cb) >= con->cb.base) { - iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb); - iov[1].iov_len = con->cb.base; - iov[1].iov_base = page_address(con->rx_page); - nvec = 2; - } - len = iov[0].iov_len + iov[1].iov_len; - iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len); - - r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL); + memset(&msg, 0, sizeof(msg)); + msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL; + ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, + msg.msg_flags); if (ret <= 0) goto out_close; - else if (ret == len) + else if (ret == iov.iov_len) call_again_soon = 1; - cbuf_add(&con->cb, ret); - ret = dlm_process_incoming_buffer(con->nodeid, - page_address(con->rx_page), - con->cb.base, con->cb.len, - PAGE_SIZE); - if (ret < 0) { - log_print("lowcomms err %d: addr=%p, base=%u, len=%u, read=%d", - ret, page_address(con->rx_page), con->cb.base, - con->cb.len, r); - cbuf_eat(&con->cb, r); - } else { - cbuf_eat(&con->cb, ret); - } + /* new buflen according readed bytes and leftover from last receive */ + buflen = ret + con->rx_leftover; + ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); + if (ret < 0) + goto out_close; - if (cbuf_empty(&con->cb) && !call_again_soon) { - __free_page(con->rx_page); - con->rx_page = NULL; + /* calculate leftover bytes from process and put it into begin of + * the receive buffer, so next receive we have the full message + * at the start address of the receive buffer. + */ + con->rx_leftover = buflen - ret; + if (con->rx_leftover) { + memmove(con->rx_buf, con->rx_buf + ret, + con->rx_leftover); + call_again_soon = true; } if (call_again_soon) goto out_resched; + mutex_unlock(&con->sock_mutex); return 0; @@ -854,6 +826,17 @@ static int accept_from_sock(struct connection *con) result = -ENOMEM; goto accept_err; } + + othercon->rx_buflen = dlm_config.ci_buffer_size; + othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); + if (!othercon->rx_buf) { + mutex_unlock(&newcon->sock_mutex); + kfree(othercon); + log_print("failed to allocate incoming socket receive buffer"); + result = -ENOMEM; + goto accept_err; + } + othercon->nodeid = nodeid; othercon->rx_action = receive_from_sock; mutex_init(&othercon->sock_mutex); @@ -1603,6 +1586,14 @@ static void shutdown_conn(struct connection *con) con->shutdown_action(con); } +static void connection_release(struct rcu_head *rcu) +{ + struct connection *con = container_of(rcu, struct connection, rcu); + + kfree(con->rx_buf); + kfree(con); +} + static void free_conn(struct connection *con) { close_connection(con, true, true, true); @@ -1611,10 +1602,10 @@ static void free_conn(struct connection *con) spin_unlock(&connections_lock); if (con->othercon) { clean_one_writequeue(con->othercon); - kfree_rcu(con->othercon, rcu); + call_rcu(&con->othercon->rcu, connection_release); } clean_one_writequeue(con); - kfree_rcu(con, rcu); + call_rcu(&con->rcu, connection_release); } static void work_flush(void) |