summaryrefslogtreecommitdiffstats
path: root/drivers/block/drbd/drbd_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/block/drbd/drbd_worker.c')
-rw-r--r--drivers/block/drbd/drbd_worker.c194
1 files changed, 146 insertions, 48 deletions
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 39ece3a2f53a..66be3910e8d2 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1210,34 +1210,25 @@ int w_prev_work_done(struct drbd_work *w, int cancel)
return 0;
}
-int w_send_barrier(struct drbd_work *w, int cancel)
+/* FIXME
+ * We need to track the number of pending barrier acks,
+ * and to be able to wait for them.
+ * See also comment in drbd_adm_attach before drbd_suspend_io.
+ */
+int drbd_send_barrier(struct drbd_tconn *tconn)
{
- struct drbd_socket *sock;
- struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
- struct drbd_conf *mdev = w->mdev;
struct p_barrier *p;
+ struct drbd_socket *sock;
- /* really avoid racing with tl_clear. w.cb may have been referenced
- * just before it was reassigned and re-queued, so double check that.
- * actually, this race was harmless, since we only try to send the
- * barrier packet here, and otherwise do nothing with the object.
- * but compare with the head of w_clear_epoch */
- spin_lock_irq(&mdev->tconn->req_lock);
- if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
- cancel = 1;
- spin_unlock_irq(&mdev->tconn->req_lock);
- if (cancel)
- return 0;
-
- sock = &mdev->tconn->data;
- p = drbd_prepare_command(mdev, sock);
+ sock = &tconn->data;
+ p = conn_prepare_command(tconn, sock);
if (!p)
return -EIO;
- p->barrier = b->br_number;
- /* inc_ap_pending was done where this was queued.
- * dec_ap_pending will be done in got_BarrierAck
- * or (on connection loss) in w_clear_epoch. */
- return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0);
+ p->barrier = tconn->send.current_epoch_nr;
+ p->pad = 0;
+ tconn->send.current_epoch_writes = 0;
+
+ return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
}
int w_send_write_hint(struct drbd_work *w, int cancel)
@@ -1257,6 +1248,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1264,6 +1256,20 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ /* this time, no tconn->send.current_epoch_writes++;
+ * If it was sent, it was the closing barrier for the last
+ * replicated epoch, before we went into AHEAD mode.
+ * No more barriers will be sent, until we leave AHEAD mode again. */
+
err = drbd_send_out_of_sync(mdev, req);
req_mod(req, OOS_HANDED_TO_NETWORK);
@@ -1280,6 +1286,7 @@ int w_send_dblock(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1287,6 +1294,17 @@ int w_send_dblock(struct drbd_work *w, int cancel)
return 0;
}
+ if (!tconn->send.seen_any_write_yet) {
+ tconn->send.seen_any_write_yet = true;
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ if (tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+ tconn->send.current_epoch_writes++;
+
err = drbd_send_dblock(mdev, req);
req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1303,6 +1321,7 @@ int w_send_read_req(struct drbd_work *w, int cancel)
{
struct drbd_request *req = container_of(w, struct drbd_request, w);
struct drbd_conf *mdev = w->mdev;
+ struct drbd_tconn *tconn = mdev->tconn;
int err;
if (unlikely(cancel)) {
@@ -1310,6 +1329,15 @@ int w_send_read_req(struct drbd_work *w, int cancel)
return 0;
}
+ /* Even read requests may close a write epoch,
+ * if there was any yet. */
+ if (tconn->send.seen_any_write_yet &&
+ tconn->send.current_epoch_nr != req->epoch) {
+ if (tconn->send.current_epoch_writes)
+ drbd_send_barrier(tconn);
+ tconn->send.current_epoch_nr = req->epoch;
+ }
+
err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
(unsigned long)req);
@@ -1673,6 +1701,34 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
mutex_unlock(mdev->state_mutex);
}
+/* If the resource already closed the current epoch, but we did not
+ * (because we have not yet seen new requests), we should send the
+ * corresponding barrier now. Must be checked within the same spinlock
+ * that is used to check for new requests. */
+bool need_to_send_barrier(struct drbd_tconn *connection)
+{
+ if (!connection->send.seen_any_write_yet)
+ return false;
+
+ /* Skip barriers that do not contain any writes.
+ * This may happen during AHEAD mode. */
+ if (!connection->send.current_epoch_writes)
+ return false;
+
+ /* ->req_lock is held when requests are queued on
+ * connection->sender_work, and put into ->transfer_log.
+ * It is also held when ->current_tle_nr is increased.
+ * So either there are already new requests queued,
+ * and corresponding barriers will be send there.
+ * Or nothing new is queued yet, so the difference will be 1.
+ */
+ if (atomic_read(&connection->current_tle_nr) !=
+ connection->send.current_epoch_nr + 1)
+ return false;
+
+ return true;
+}
+
bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
{
spin_lock_irq(&queue->q_lock);
@@ -1690,15 +1746,79 @@ bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_lis
return !list_empty(work_list);
}
+void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
+{
+ DEFINE_WAIT(wait);
+ struct net_conf *nc;
+ int uncork, cork;
+
+ dequeue_work_item(&connection->sender_work, work_list);
+ if (!list_empty(work_list))
+ return;
+
+ /* Still nothing to do?
+ * Maybe we still need to close the current epoch,
+ * even if no new requests are queued yet.
+ *
+ * Also, poke TCP, just in case.
+ * Then wait for new work (or signal). */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ uncork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ if (uncork) {
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket)
+ drbd_tcp_uncork(connection->data.socket);
+ mutex_unlock(&connection->data.mutex);
+ }
+
+ for (;;) {
+ int send_barrier;
+ prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
+ spin_lock_irq(&connection->req_lock);
+ spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ list_splice_init(&connection->sender_work.q, work_list);
+ spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
+ if (!list_empty(work_list) || signal_pending(current)) {
+ spin_unlock_irq(&connection->req_lock);
+ break;
+ }
+ send_barrier = need_to_send_barrier(connection);
+ spin_unlock_irq(&connection->req_lock);
+ if (send_barrier) {
+ drbd_send_barrier(connection);
+ connection->send.current_epoch_nr++;
+ }
+ schedule();
+ /* may be woken up for other things but new work, too,
+ * e.g. if the current epoch got closed.
+ * In which case we send the barrier above. */
+ }
+ finish_wait(&connection->sender_work.q_wait, &wait);
+
+ /* someone may have changed the config while we have been waiting above. */
+ rcu_read_lock();
+ nc = rcu_dereference(connection->net_conf);
+ cork = nc ? nc->tcp_cork : 0;
+ rcu_read_unlock();
+ mutex_lock(&connection->data.mutex);
+ if (connection->data.socket) {
+ if (cork)
+ drbd_tcp_cork(connection->data.socket);
+ else if (!uncork)
+ drbd_tcp_uncork(connection->data.socket);
+ }
+ mutex_unlock(&connection->data.mutex);
+}
+
int drbd_worker(struct drbd_thread *thi)
{
struct drbd_tconn *tconn = thi->tconn;
struct drbd_work *w = NULL;
struct drbd_conf *mdev;
- struct net_conf *nc;
LIST_HEAD(work_list);
int vnr;
- int cork;
while (get_t_state(thi) == RUNNING) {
drbd_thread_current_set_cpu(thi);
@@ -1706,29 +1826,7 @@ int drbd_worker(struct drbd_thread *thi)
/* as long as we use drbd_queue_work_front(),
* we may only dequeue single work items here, not batches. */
if (list_empty(&work_list))
- dequeue_work_item(&tconn->sender_work, &work_list);
-
- /* Still nothing to do? Poke TCP, just in case,
- * then wait for new work (or signal). */
- if (list_empty(&work_list)) {
- mutex_lock(&tconn->data.mutex);
- rcu_read_lock();
- nc = rcu_dereference(tconn->net_conf);
- cork = nc ? nc->tcp_cork : 0;
- rcu_read_unlock();
-
- if (tconn->data.socket && cork)
- drbd_tcp_uncork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
-
- wait_event_interruptible(tconn->sender_work.q_wait,
- dequeue_work_item(&tconn->sender_work, &work_list));
-
- mutex_lock(&tconn->data.mutex);
- if (tconn->data.socket && cork)
- drbd_tcp_cork(tconn->data.socket);
- mutex_unlock(&tconn->data.mutex);
- }
+ wait_for_work(tconn, &work_list);
if (signal_pending(current)) {
flush_signals(current);