summaryrefslogtreecommitdiffstats
path: root/net/rxrpc/recvmsg.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2022-08-27 14:27:56 +0100
committerDavid Howells <dhowells@redhat.com>2022-11-08 16:42:28 +0000
commit5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb (patch)
treebb10e432f005b9988c542d079a1c6f0200b4fc64 /net/rxrpc/recvmsg.c
parentd4d02d8bb5c412d977af7ea7c7ea91977a6a64dc (diff)
downloadlinux-stable-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.tar.gz
linux-stable-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.tar.bz2
linux-stable-5d7edbc9231ec6b60f9c5b7e7980e9a1cd92e6bb.zip
rxrpc: Get rid of the Rx ring
Get rid of the Rx ring and replace it with a pair of queues instead. One queue gets the packets that are in-sequence and are ready for processing by recvmsg(); the other queue gets the out-of-sequence packets for addition to the first queue as the holes get filled. The annotation ring is removed and replaced with a SACK table. The SACK table has the bits set that correspond exactly to the sequence number of the packet being acked. The SACK ring is copied when an ACK packet is being assembled and rotated so that the first ACK is in byte 0. Flow control handling is altered so that packets that are moved to the in-sequence queue are hard-ACK'd even before they're consumed - and then the Rx window size in the ACK packet (rsize) is shrunk down to compensate (even going to 0 if the window is full). Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/recvmsg.c')
-rw-r--r--net/rxrpc/recvmsg.c117
1 files changed, 60 insertions, 57 deletions
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 401aae687830..efb85f983657 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -173,7 +173,8 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
break;
}
- trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
+ trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
+ lower_32_bits(atomic64_read(&call->ackr_window)) - 1,
call->rx_pkt_offset, call->rx_pkt_len, ret);
return ret;
}
@@ -183,10 +184,11 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
*/
static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
{
+ rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
+
_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
- trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
- ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
+ trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
@@ -220,45 +222,53 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top;
- bool last = false;
- int ix;
+ rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
+ bool last;
+ int acked;
_enter("%d", call->debug_id);
- hard_ack = call->rx_hard_ack;
- top = smp_load_acquire(&call->rx_top);
- ASSERT(before(hard_ack, top));
-
- hard_ack++;
- ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
- skb = call->rxtx_buffer[ix];
+further_rotation:
+ skb = skb_dequeue(&call->recvmsg_queue);
rxrpc_see_skb(skb, rxrpc_skb_rotated);
- sp = rxrpc_skb(skb);
+ sp = rxrpc_skb(skb);
+ tseq = sp->hdr.seq;
serial = sp->hdr.serial;
+ last = sp->hdr.flags & RXRPC_LAST_PACKET;
- if (sp->hdr.flags & RXRPC_LAST_PACKET)
- last = true;
-
- call->rxtx_buffer[ix] = NULL;
- call->rxtx_annotations[ix] = 0;
/* Barrier against rxrpc_input_data(). */
- smp_store_release(&call->rx_hard_ack, hard_ack);
+ if (after(tseq, call->rx_consumed))
+ smp_store_release(&call->rx_consumed, tseq);
rxrpc_free_skb(skb, rxrpc_skb_freed);
- trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
+ trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
+ serial, call->rx_consumed);
if (last) {
rxrpc_end_rx_phase(call, serial);
- } else {
- /* Check to see if there's an ACK that needs sending. */
- if (atomic_inc_return(&call->ackr_nr_consumed) > 2 &&
- !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
- rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
- rxrpc_propose_ack_rotate_rx);
- rxrpc_transmit_ack_packets(call->peer->local);
- }
+ return;
+ }
+
+ /* The next packet on the queue might entirely overlap with the one we
+ * just consumed; if so, rotate that away also.
+ */
+ skb = skb_peek(&call->recvmsg_queue);
+ if (skb) {
+ sp = rxrpc_skb(skb);
+ if (sp->hdr.seq != call->rx_consumed &&
+ after_eq(call->rx_consumed, sp->hdr.seq))
+ goto further_rotation;
+ }
+
+ /* Check to see if there's an ACK that needs sending. */
+ acked = atomic_add_return(call->rx_consumed - old_consumed,
+ &call->ackr_nr_consumed);
+ if (acked > 2 &&
+ !test_and_set_bit(RXRPC_CALL_IDLE_ACK_PENDING, &call->flags)) {
+ rxrpc_send_ACK(call, RXRPC_ACK_IDLE, serial,
+ rxrpc_propose_ack_rotate_rx);
+ rxrpc_transmit_ack_packets(call->peer->local);
}
}
@@ -285,46 +295,38 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
- rxrpc_serial_t serial;
- rxrpc_seq_t hard_ack, top, seq;
+ rxrpc_seq_t seq = 0;
size_t remain;
unsigned int rx_pkt_offset, rx_pkt_len;
- int ix, copy, ret = -EAGAIN, ret2;
+ int copy, ret = -EAGAIN, ret2;
rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len;
if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
- seq = call->rx_hard_ack;
+ seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1;
goto done;
}
- /* Barriers against rxrpc_input_data(). */
- hard_ack = call->rx_hard_ack;
- seq = hard_ack + 1;
-
- while (top = smp_load_acquire(&call->rx_top),
- before_eq(seq, top)
- ) {
- ix = seq & RXRPC_RXTX_BUFF_MASK;
- skb = call->rxtx_buffer[ix];
- if (!skb) {
- trace_rxrpc_recvdata(call, rxrpc_recvmsg_hole, seq,
- rx_pkt_offset, rx_pkt_len, 0);
- rxrpc_transmit_ack_packets(call->peer->local);
- break;
- }
- smp_rmb();
+ /* No one else can be removing stuff from the queue, so we shouldn't
+ * need the Rx lock to walk it.
+ */
+ skb = skb_peek(&call->recvmsg_queue);
+ while (skb) {
rxrpc_see_skb(skb, rxrpc_skb_seen);
sp = rxrpc_skb(skb);
+ seq = sp->hdr.seq;
- if (!(flags & MSG_PEEK)) {
- serial = sp->hdr.serial;
- trace_rxrpc_receive(call, rxrpc_receive_front,
- serial, seq);
+ if (after_eq(call->rx_consumed, seq)) {
+ kdebug("obsolete %x %x", call->rx_consumed, seq);
+ goto skip_obsolete;
}
+ if (!(flags & MSG_PEEK))
+ trace_rxrpc_receive(call, rxrpc_receive_front,
+ sp->hdr.serial, seq);
+
if (msg)
sock_recv_timestamp(msg, sock->sk, skb);
@@ -338,6 +340,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
ret = ret2;
goto out;
}
+ rxrpc_transmit_ack_packets(call->peer->local);
} else {
trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
rx_pkt_offset, rx_pkt_len, 0);
@@ -370,16 +373,17 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
break;
}
+ skip_obsolete:
/* The whole packet has been transferred. */
if (sp->hdr.flags & RXRPC_LAST_PACKET)
ret = 1;
rx_pkt_offset = 0;
rx_pkt_len = 0;
+ skb = skb_peek_next(skb, &call->recvmsg_queue);
+
if (!(flags & MSG_PEEK))
rxrpc_rotate_rx_window(call);
-
- seq++;
}
out:
@@ -522,8 +526,7 @@ try_again:
ret = 0;
rxrpc_transmit_ack_packets(call->peer->local);
- if (after(call->rx_top, call->rx_hard_ack) &&
- call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
+ if (!skb_queue_empty(&call->recvmsg_queue))
rxrpc_notify_socket(call);
break;
default: