diff options
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 83 |
1 files changed, 63 insertions, 20 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e3be1d22a247..b9b0e3b5da49 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c @@ -163,6 +163,7 @@ static struct kmem_cache *ceph_msg_data_cache; static char tag_msg = CEPH_MSGR_TAG_MSG; static char tag_ack = CEPH_MSGR_TAG_ACK; static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE; +static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2; #ifdef CONFIG_LOCKDEP static struct lock_class_key socket_class; @@ -176,7 +177,7 @@ static struct lock_class_key socket_class; static void queue_con(struct ceph_connection *con); static void cancel_con(struct ceph_connection *con); -static void con_work(struct work_struct *); +static void ceph_con_workfn(struct work_struct *); static void con_fault(struct ceph_connection *con); /* @@ -276,22 +277,22 @@ static void _ceph_msgr_exit(void) ceph_msgr_wq = NULL; } - ceph_msgr_slab_exit(); - BUG_ON(zero_page == NULL); page_cache_release(zero_page); zero_page = NULL; + + ceph_msgr_slab_exit(); } int ceph_msgr_init(void) { + if (ceph_msgr_slab_init()) + return -ENOMEM; + BUG_ON(zero_page != NULL); zero_page = ZERO_PAGE(0); page_cache_get(zero_page); - if (ceph_msgr_slab_init()) - return -ENOMEM; - /* * The number of active work items is limited by the number of * connections, so leave @max_active at default. @@ -749,7 +750,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); - INIT_DELAYED_WORK(&con->work, con_work); + INIT_DELAYED_WORK(&con->work, ceph_con_workfn); con->state = CON_STATE_CLOSED; } @@ -1351,7 +1352,16 @@ static void prepare_write_keepalive(struct ceph_connection *con) { dout("prepare_write_keepalive %p\n", con); con_out_kvec_reset(con); - con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); + if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) { + struct timespec now = CURRENT_TIME; + + con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2); + ceph_encode_timespec(&con->out_temp_keepalive2, &now); + con_out_kvec_add(con, sizeof(con->out_temp_keepalive2), + &con->out_temp_keepalive2); + } else { + con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive); + } con_flag_set(con, CON_FLAG_WRITE_PENDING); } @@ -1625,6 +1635,12 @@ static void prepare_read_tag(struct ceph_connection *con) con->in_tag = CEPH_MSGR_TAG_READY; } +static void prepare_read_keepalive_ack(struct ceph_connection *con) +{ + dout("prepare_read_keepalive_ack %p\n", con); + con->in_base_pos = 0; +} + /* * Prepare to read a message. */ @@ -2322,13 +2338,6 @@ static int read_partial_message(struct ceph_connection *con) return ret; BUG_ON(!con->in_msg ^ skip); - if (con->in_msg && data_len > con->in_msg->data_length) { - pr_warn("%s skipping long message (%u > %zd)\n", - __func__, data_len, con->in_msg->data_length); - ceph_msg_put(con->in_msg); - con->in_msg = NULL; - skip = 1; - } if (skip) { /* skip this message */ dout("alloc_msg said skip message\n"); @@ -2457,6 +2466,17 @@ static void process_message(struct ceph_connection *con) mutex_lock(&con->mutex); } +static int read_keepalive_ack(struct ceph_connection *con) +{ + struct ceph_timespec ceph_ts; + size_t size = sizeof(ceph_ts); + int ret = read_partial(con, size, size, &ceph_ts); + if (ret <= 0) + return ret; + ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts); + prepare_read_tag(con); + return 1; +} /* * Write something to the socket. Called in a worker thread when the @@ -2526,6 +2546,10 @@ more_kvec: do_next: if (con->state == CON_STATE_OPEN) { + if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { + prepare_write_keepalive(con); + goto more; + } /* is anything else pending? */ if (!list_empty(&con->out_queue)) { prepare_write_message(con); @@ -2535,10 +2559,6 @@ do_next: prepare_write_ack(con); goto more; } - if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) { - prepare_write_keepalive(con); - goto more; - } } /* Nothing to do! */ @@ -2641,6 +2661,9 @@ more: case CEPH_MSGR_TAG_ACK: prepare_read_ack(con); break; + case CEPH_MSGR_TAG_KEEPALIVE2_ACK: + prepare_read_keepalive_ack(con); + break; case CEPH_MSGR_TAG_CLOSE: con_close_socket(con); con->state = CON_STATE_CLOSED; @@ -2684,6 +2707,12 @@ more: process_ack(con); goto more; } + if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) { + ret = read_keepalive_ack(con); + if (ret <= 0) + goto out; + goto more; + } out: dout("try_read done on %p ret %d\n", con, ret); @@ -2799,7 +2828,7 @@ static void con_fault_finish(struct ceph_connection *con) /* * Do some work on a connection. Drop a connection ref when we're done. */ -static void con_work(struct work_struct *work) +static void ceph_con_workfn(struct work_struct *work) { struct ceph_connection *con = container_of(work, struct ceph_connection, work.work); @@ -3101,6 +3130,20 @@ void ceph_con_keepalive(struct ceph_connection *con) } EXPORT_SYMBOL(ceph_con_keepalive); +bool ceph_con_keepalive_expired(struct ceph_connection *con, + unsigned long interval) +{ + if (interval > 0 && + (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { + struct timespec now = CURRENT_TIME; + struct timespec ts; + jiffies_to_timespec(interval, &ts); + ts = timespec_add(con->last_keepalive_ack, ts); + return timespec_compare(&now, &ts) >= 0; + } + return false; +} + static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type) { struct ceph_msg_data *data; |