diff options
-rw-r--r-- | include/linux/ceph/messenger.h | 7 | ||||
-rw-r--r-- | net/ceph/messenger.c | 138 |
2 files changed, 135 insertions, 10 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 5860dd0c2caf..14862438faff 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h @@ -95,6 +95,12 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type) } } +struct ceph_msg_data_cursor { + bool last_piece; /* now at last piece of data item */ + struct page *page; /* current page in pagelist */ + size_t offset; /* pagelist bytes consumed */ +}; + struct ceph_msg_data { enum ceph_msg_data_type type; union { @@ -112,6 +118,7 @@ struct ceph_msg_data { }; struct ceph_pagelist *pagelist; }; + struct ceph_msg_data_cursor cursor; /* pagelist only */ }; /* diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index f256b4b174ad..b978cf8b27ff 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -21,6 +21,9 @@ #include <linux/ceph/pagelist.h> #include <linux/export.h> +#define list_entry_next(pos, member) \ + list_entry(pos->member.next, typeof(*pos), member) + /* * Ceph uses the messenger to exchange ceph_msg messages with other * hosts in the system. The messenger provides ordered and reliable @@ -738,6 +741,109 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg) } #endif +/* + * Message data is handled (sent or received) in pieces, where each + * piece resides on a single page. The network layer might not + * consume an entire piece at once. A data item's cursor keeps + * track of which piece is next to process and how much remains to + * be processed in that piece. It also tracks whether the current + * piece is the last one in the data item. + */ +static void ceph_msg_data_cursor_init(struct ceph_msg_data *data) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + struct page *page; + + if (data->type != CEPH_MSG_DATA_PAGELIST) + return; + + pagelist = data->pagelist; + BUG_ON(!pagelist); + if (!pagelist->length) + return; /* pagelist can be assigned but empty */ + + BUG_ON(list_empty(&pagelist->head)); + page = list_first_entry(&pagelist->head, struct page, lru); + + cursor->page = page; + cursor->offset = 0; + cursor->last_piece = pagelist->length <= PAGE_SIZE; +} + +/* + * Return the page containing the next piece to process for a given + * data item, and supply the page offset and length of that piece. + * Indicate whether this is the last piece in this data item. + */ +static struct page *ceph_msg_data_next(struct ceph_msg_data *data, + size_t *page_offset, + size_t *length, + bool *last_piece) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + size_t piece_end; + + BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); + + pagelist = data->pagelist; + BUG_ON(!pagelist); + + BUG_ON(!cursor->page); + BUG_ON(cursor->offset >= pagelist->length); + + *last_piece = cursor->last_piece; + if (*last_piece) { + /* pagelist offset is always 0 */ + piece_end = pagelist->length & ~PAGE_MASK; + if (!piece_end) + piece_end = PAGE_SIZE; + } else { + piece_end = PAGE_SIZE; + } + *page_offset = cursor->offset & ~PAGE_MASK; + *length = piece_end - *page_offset; + + return data->cursor.page; +} + +/* + * Returns true if the result moves the cursor on to the next piece + * (the next page) of the pagelist. + */ +static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes) +{ + struct ceph_msg_data_cursor *cursor = &data->cursor; + struct ceph_pagelist *pagelist; + + BUG_ON(data->type != CEPH_MSG_DATA_PAGELIST); + + pagelist = data->pagelist; + BUG_ON(!pagelist); + BUG_ON(!cursor->page); + BUG_ON(cursor->offset + bytes > pagelist->length); + BUG_ON((cursor->offset & ~PAGE_MASK) + bytes > PAGE_SIZE); + + /* Advance the cursor offset */ + + cursor->offset += bytes; + /* pagelist offset is always 0 */ + if (!bytes || cursor->offset & ~PAGE_MASK) + return false; /* more bytes to process in the current page */ + + /* Move on to the next page */ + + BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); + cursor->page = list_entry_next(cursor->page, lru); + + /* cursor offset is at page boundary; pagelist offset is always 0 */ + if (pagelist->length - cursor->offset <= PAGE_SIZE) + cursor->last_piece = true; + + return true; +} + static void prepare_message_data(struct ceph_msg *msg, struct ceph_msg_pos *msg_pos) { @@ -755,6 +861,12 @@ static void prepare_message_data(struct ceph_msg *msg, init_bio_iter(msg->b.bio, &msg->b.bio_iter, &msg->b.bio_seg); #endif msg_pos->data_pos = 0; + + /* If there's a trail, initialize its cursor */ + + if (ceph_msg_has_trail(msg)) + ceph_msg_data_cursor_init(&msg->t); + msg_pos->did_page_crc = false; } @@ -1045,6 +1157,12 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, msg_pos->data_pos += sent; msg_pos->page_pos += sent; + if (in_trail) { + bool need_crc; + + need_crc = ceph_msg_data_advance(&msg->t, sent); + BUG_ON(need_crc && sent != len); + } if (sent < len) return; @@ -1052,10 +1170,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page, msg_pos->page_pos = 0; msg_pos->page++; msg_pos->did_page_crc = false; - if (in_trail) { - BUG_ON(!ceph_msg_has_trail(msg)); - list_rotate_left(&msg->t.pagelist->head); - } else if (ceph_msg_has_pagelist(msg)) { + if (ceph_msg_has_pagelist(msg)) { list_rotate_left(&msg->l.pagelist->head); #ifdef CONFIG_BLOCK } else if (ceph_msg_has_bio(msg)) { @@ -1141,6 +1256,8 @@ static int write_partial_message_data(struct ceph_connection *con) size_t length; int max_write = PAGE_SIZE; int bio_offset = 0; + bool use_cursor = false; + bool last_piece = true; /* preserve existing behavior */ in_trail = in_trail || msg_pos->data_pos >= trail_off; if (!in_trail) @@ -1148,9 +1265,9 @@ static int write_partial_message_data(struct ceph_connection *con) if (in_trail) { BUG_ON(!ceph_msg_has_trail(msg)); - total_max_write = data_len - msg_pos->data_pos; - page = list_first_entry(&msg->t.pagelist->head, - struct page, lru); + use_cursor = true; + page = ceph_msg_data_next(&msg->t, &page_offset, + &length, &last_piece); } else if (ceph_msg_has_pages(msg)) { page = msg->p.pages[msg_pos->page]; } else if (ceph_msg_has_pagelist(msg)) { @@ -1168,8 +1285,9 @@ static int write_partial_message_data(struct ceph_connection *con) } else { page = zero_page; } - length = min_t(int, max_write - msg_pos->page_pos, - total_max_write); + if (!use_cursor) + length = min_t(int, max_write - msg_pos->page_pos, + total_max_write); page_offset = msg_pos->page_pos + bio_offset; if (do_datacrc && !msg_pos->did_page_crc) { @@ -1180,7 +1298,7 @@ static int write_partial_message_data(struct ceph_connection *con) msg_pos->did_page_crc = true; } ret = ceph_tcp_sendpage(con->sock, page, page_offset, - length, true); + length, last_piece); if (ret <= 0) goto out; |