summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--include/linux/ceph/messenger.h7
-rw-r--r--net/ceph/messenger.c138
2 files changed, 135 insertions, 10 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 5860dd0c2caf..14862438faff 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
}
}
+struct ceph_msg_data_cursor {
+ bool last_piece; /* now at last piece of data item */
+ struct page *page; /* current page in pagelist */
+ size_t offset; /* pagelist bytes consumed */
+};
+
struct ceph_msg_data {
enum ceph_msg_data_type type;
union {
@@ -112,6 +118,7 @@ struct ceph_msg_data {
};
struct ceph_pagelist *pagelist;
};
+ struct ceph_msg_data_cursor cursor; /* pagelist only */
};
/*
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f256b4b174ad..b978cf8b27ff 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -21,6 +21,9 @@
#include <linux/ceph/pagelist.h>
#include <linux/export.h>
+#define list_entry_next(pos, member) \
+ list_entry(pos->member.next, typeof(*pos), member)
+
/*
* Ceph uses the messenger to exchange ceph_msg messages with other
* hosts in the system. The messenger provides ordered and reliable
@@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
}
#endif
+/*
+ * Message data is handled (sent or received) in pieces, where each
+ * piece resides on a single page. The network layer might not
+ * consume an entire piece at once. A data item's cursor keeps
+ * track of which piece is next to process and how much remains to
+ * be processed in that piece. It also tracks whether the current
+ * piece is the last one in the data item.
+ */
+static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+ struct page *page;
+
+ if (data->type != CEPH_MSG_DATA_PAGELIST)
+ return;
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+ if (!pagelist->length)
+ return; /* pagelist can be assigned but empty */
+
+ BUG_ON(list_empty(&pagelist->head));
+ page = list_first_entry(&pagelist->head, struct page, lru);
+
+ cursor->page = page;
+ cursor->offset = 0;
+ cursor->last_piece = pagelist->length <= PAGE_SIZE;
+}
+
+/*
+ * Return the page containing the next piece to process for a given
+ * data item, and supply the page offset and length of that piece.
+ * Indicate whether this is the last piece in this data item.
+ */
+static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
+ size_t *page_offset,
+ size_t *length,
+ bool *last_piece)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+ size_t piece_end;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset >= pagelist->length);
+
+ *last_piece = cursor->last_piece;
+ if (*last_piece) {
+ /* pagelist offset is always 0 */
+ piece_end = pagelist->length & ~PAGE_MASK;
+ if (!piece_end)
+ piece_end = PAGE_SIZE;
+ } else {
+ piece_end = PAGE_SIZE;
+ }
+ *page_offset = cursor->offset & ~PAGE_MASK;
+ *length = piece_end - *page_offset;
+
+ return data->cursor.page;
+}
+
+/*
+ * Returns true if the result moves the cursor on to the next piece
+ * (the next page) of the pagelist.
+ */
+static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
+{
+ struct ceph_msg_data_cursor *cursor = &data->cursor;
+ struct ceph_pagelist *pagelist;
+
+ BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST);
+
+ pagelist = data->pagelist;
+ BUG_ON(!pagelist);
+ BUG_ON(!cursor->page);
+ BUG_ON(cursor->offset + bytes > pagelist->length);
+ BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE);
+
+ /* Advance the cursor offset */
+
+ cursor->offset += bytes;
+ /* pagelist offset is always 0 */
+ if (!bytes || cursor->offset & ~PAGE_MASK)
+ return false; /* more bytes to process in the current page */
+
+ /* Move on to the next page */
+
+ BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
+ cursor->page = list_entry_next(cursor->page, lru);
+
+ /* cursor offset is at page boundary; pagelist offset is always 0 */
+ if (pagelist->length - cursor->offset <= PAGE_SIZE)
+ cursor->last_piece = true;
+
+ return true;
+}
+
static void prepare_message_data(struct ceph_msg *msg,
struct ceph_msg_pos *msg_pos)
{
@@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg,
init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg);
#endif
msg_pos->data_pos = 0;
+
+ /* If there's a trail, initialize its cursor */
+
+ if (ceph_msg_has_trail(msg))
+ ceph_msg_data_cursor_init(&msg->t);
+
msg_pos->did_page_crc = false;
}
@@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->data_pos += sent;
msg_pos->page_pos += sent;
+ if (in_trail) {
+ bool need_crc;
+
+ need_crc = ceph_msg_data_advance(&msg->t, sent);
+ BUG_ON(need_crc && sent != len);
+ }
if (sent < len)
return;
@@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
msg_pos->page_pos = 0;
msg_pos->page++;
msg_pos->did_page_crc = false;
- if (in_trail) {
- BUG_ON(!ceph_msg_has_trail(msg));
- list_rotate_left(&msg->t.pagelist->head);
- } else if (ceph_msg_has_pagelist(msg)) {
+ if (ceph_msg_has_pagelist(msg)) {
list_rotate_left(&msg->l.pagelist->head);
#ifdef CONFIG_BLOCK
} else if (ceph_msg_has_bio(msg)) {
@@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con)
size_t length;
int max_write = PAGE_SIZE;
int bio_offset = 0;
+ bool use_cursor = false;
+ bool last_piece = true; /* preserve existing behavior */
in_trail = in_trail || msg_pos->data_pos >= trail_off;
if (!in_trail)
@@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con)
if (in_trail) {
BUG_ON(!ceph_msg_has_trail(msg));
- total_max_write = data_len - msg_pos->data_pos;
- page = list_first_entry(&msg->t.pagelist->head,
- struct page, lru);
+ use_cursor = true;
+ page = ceph_msg_data_next(&msg->t, &page_offset,
+ &length, &last_piece);
} else if (ceph_msg_has_pages(msg)) {
page = msg->p.pages[msg_pos->page];
} else if (ceph_msg_has_pagelist(msg)) {
@@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con)
} else {
page = zero_page;
}
- length = min_t(int, max_write - msg_pos->page_pos,
- total_max_write);
+ if (!use_cursor)
+ length = min_t(int, max_write - msg_pos->page_pos,
+ total_max_write);
page_offset = msg_pos->page_pos + bio_offset;
if (do_datacrc && !msg_pos->did_page_crc) {
@@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con)
msg_pos->did_page_crc = true;
}
ret = ceph_tcp_sendpage(con->sock, page, page_offset,
- length, true);
+ length, last_piece);
if (ret <= 0)
goto out;