diff options
Diffstat (limited to 'drivers/block/drbd/drbd_worker.c')
-rw-r--r-- | drivers/block/drbd/drbd_worker.c | 194 |
1 files changed, 146 insertions, 48 deletions
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c index 39ece3a2f53a..66be3910e8d2 100644 --- a/drivers/block/drbd/drbd_worker.c +++ b/drivers/block/drbd/drbd_worker.c @@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel) return 0; } -int w_send_barrier(struct drbd_work *w, int cancel) +/* FIXME + * We need to track the number of pending barrier acks, + * and to be able to wait for them. + * See also comment in drbd_adm_attach before drbd_suspend_io. + */ +int drbd_send_barrier(struct drbd_tconn *tconn) { - struct drbd_socket *sock; - struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); - struct drbd_conf *mdev = w->mdev; struct p_barrier *p; + struct drbd_socket *sock; - /* really avoid racing with tl_clear. w.cb may have been referenced - * just before it was reassigned and re-queued, so double check that. - * actually, this race was harmless, since we only try to send the - * barrier packet here, and otherwise do nothing with the object. - * but compare with the head of w_clear_epoch */ - spin_lock_irq(&mdev->tconn->req_lock); - if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) - cancel = 1; - spin_unlock_irq(&mdev->tconn->req_lock); - if (cancel) - return 0; - - sock = &mdev->tconn->data; - p = drbd_prepare_command(mdev, sock); + sock = &tconn->data; + p = conn_prepare_command(tconn, sock); if (!p) return -EIO; - p->barrier = b->br_number; - /* inc_ap_pending was done where this was queued. - * dec_ap_pending will be done in got_BarrierAck - * or (on connection loss) in w_clear_epoch. */ - return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0); + p->barrier = tconn->send.current_epoch_nr; + p->pad = 0; + tconn->send.current_epoch_writes = 0; + + return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0); } int w_send_write_hint(struct drbd_work *w, int cancel) @@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel) return 0; } + if (!tconn->send.seen_any_write_yet) { + tconn->send.seen_any_write_yet = true; + tconn->send.current_epoch_nr = req->epoch; + } + if (tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + /* this time, no tconn->send.current_epoch_writes++; + * If it was sent, it was the closing barrier for the last + * replicated epoch, before we went into AHEAD mode. + * No more barriers will be sent, until we leave AHEAD mode again. */ + err = drbd_send_out_of_sync(mdev, req); req_mod(req, OOS_HANDED_TO_NETWORK); @@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel) return 0; } + if (!tconn->send.seen_any_write_yet) { + tconn->send.seen_any_write_yet = true; + tconn->send.current_epoch_nr = req->epoch; + } + if (tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + tconn->send.current_epoch_writes++; + err = drbd_send_dblock(mdev, req); req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); @@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel) { struct drbd_request *req = container_of(w, struct drbd_request, w); struct drbd_conf *mdev = w->mdev; + struct drbd_tconn *tconn = mdev->tconn; int err; if (unlikely(cancel)) { @@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel) return 0; } + /* Even read requests may close a write epoch, + * if there was any yet. */ + if (tconn->send.seen_any_write_yet && + tconn->send.current_epoch_nr != req->epoch) { + if (tconn->send.current_epoch_writes) + drbd_send_barrier(tconn); + tconn->send.current_epoch_nr = req->epoch; + } + err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, (unsigned long)req); @@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) mutex_unlock(mdev->state_mutex); } +/* If the resource already closed the current epoch, but we did not + * (because we have not yet seen new requests), we should send the + * corresponding barrier now. Must be checked within the same spinlock + * that is used to check for new requests. */ +bool need_to_send_barrier(struct drbd_tconn *connection) +{ + if (!connection->send.seen_any_write_yet) + return false; + + /* Skip barriers that do not contain any writes. + * This may happen during AHEAD mode. */ + if (!connection->send.current_epoch_writes) + return false; + + /* ->req_lock is held when requests are queued on + * connection->sender_work, and put into ->transfer_log. + * It is also held when ->current_tle_nr is increased. + * So either there are already new requests queued, + * and corresponding barriers will be send there. + * Or nothing new is queued yet, so the difference will be 1. + */ + if (atomic_read(&connection->current_tle_nr) != + connection->send.current_epoch_nr + 1) + return false; + + return true; +} + bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) { spin_lock_irq(&queue->q_lock); @@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis return !list_empty(work_list); } +void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list) +{ + DEFINE_WAIT(wait); + struct net_conf *nc; + int uncork, cork; + + dequeue_work_item(&connection->sender_work, work_list); + if (!list_empty(work_list)) + return; + + /* Still nothing to do? + * Maybe we still need to close the current epoch, + * even if no new requests are queued yet. + * + * Also, poke TCP, just in case. + * Then wait for new work (or signal). */ + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + uncork = nc ? nc->tcp_cork : 0; + rcu_read_unlock(); + if (uncork) { + mutex_lock(&connection->data.mutex); + if (connection->data.socket) + drbd_tcp_uncork(connection->data.socket); + mutex_unlock(&connection->data.mutex); + } + + for (;;) { + int send_barrier; + prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); + spin_lock_irq(&connection->req_lock); + spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ + list_splice_init(&connection->sender_work.q, work_list); + spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ + if (!list_empty(work_list) || signal_pending(current)) { + spin_unlock_irq(&connection->req_lock); + break; + } + send_barrier = need_to_send_barrier(connection); + spin_unlock_irq(&connection->req_lock); + if (send_barrier) { + drbd_send_barrier(connection); + connection->send.current_epoch_nr++; + } + schedule(); + /* may be woken up for other things but new work, too, + * e.g. if the current epoch got closed. + * In which case we send the barrier above. */ + } + finish_wait(&connection->sender_work.q_wait, &wait); + + /* someone may have changed the config while we have been waiting above. */ + rcu_read_lock(); + nc = rcu_dereference(connection->net_conf); + cork = nc ? nc->tcp_cork : 0; + rcu_read_unlock(); + mutex_lock(&connection->data.mutex); + if (connection->data.socket) { + if (cork) + drbd_tcp_cork(connection->data.socket); + else if (!uncork) + drbd_tcp_uncork(connection->data.socket); + } + mutex_unlock(&connection->data.mutex); +} + int drbd_worker(struct drbd_thread *thi) { struct drbd_tconn *tconn = thi->tconn; struct drbd_work *w = NULL; struct drbd_conf *mdev; - struct net_conf *nc; LIST_HEAD(work_list); int vnr; - int cork; while (get_t_state(thi) == RUNNING) { drbd_thread_current_set_cpu(thi); @@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi) /* as long as we use drbd_queue_work_front(), * we may only dequeue single work items here, not batches. */ if (list_empty(&work_list)) - dequeue_work_item(&tconn->sender_work, &work_list); - - /* Still nothing to do? Poke TCP, just in case, - * then wait for new work (or signal). */ - if (list_empty(&work_list)) { - mutex_lock(&tconn->data.mutex); - rcu_read_lock(); - nc = rcu_dereference(tconn->net_conf); - cork = nc ? nc->tcp_cork : 0; - rcu_read_unlock(); - - if (tconn->data.socket && cork) - drbd_tcp_uncork(tconn->data.socket); - mutex_unlock(&tconn->data.mutex); - - wait_event_interruptible(tconn->sender_work.q_wait, - dequeue_work_item(&tconn->sender_work, &work_list)); - - mutex_lock(&tconn->data.mutex); - if (tconn->data.socket && cork) - drbd_tcp_cork(tconn->data.socket); - mutex_unlock(&tconn->data.mutex); - } + wait_for_work(tconn, &work_list); if (signal_pending(current)) { flush_signals(current); |