summaryrefslogtreecommitdiffstats
path: root/net/rxrpc/io_thread.c
diff options
context:
space:
mode:
authorDavid Howells <dhowells@redhat.com>2020-01-23 13:13:41 +0000
committerDavid Howells <dhowells@redhat.com>2022-12-01 13:36:42 +0000
commit5e6ef4f1017c7f844e305283bbd8875af475e2fc (patch)
treea1af5b9ab3f538d84a50214be6c41fd0700d4bca /net/rxrpc/io_thread.c
parent393a2a2007d13df7ae54c94328b45b6c2269b6a9 (diff)
downloadlinux-5e6ef4f1017c7f844e305283bbd8875af475e2fc.tar.gz
linux-5e6ef4f1017c7f844e305283bbd8875af475e2fc.tar.bz2
linux-5e6ef4f1017c7f844e305283bbd8875af475e2fc.zip
rxrpc: Make the I/O thread take over the call and local processor work
Move the functions from the call->processor and local->processor work items into the domain of the I/O thread. The call event processor, now called from the I/O thread, then takes over the job of cranking the call state machine, processing incoming packets and transmitting DATA, ACK and ABORT packets. In a future patch, rxrpc_send_ACK() will transmit the ACK on the spot rather than queuing it for later transmission. The call event processor becomes purely received-skb driven. It only transmits things in response to events. We use "pokes" to queue a dummy skb to make it do things like start/resume transmitting data. Timer expiry also results in pokes. The connection event processor, becomes similar, though crypto events, such as dealing with CHALLENGE and RESPONSE packets is offloaded to a work item to avoid doing crypto in the I/O thread. The local event processor is removed and VERSION response packets are generated directly from the packet parser. Similarly, ABORTs generated in response to protocol errors will be transmitted immediately rather than being pushed onto a queue for later transmission. Changes: ======== ver #2) - Fix a couple of introduced lock context imbalances. Signed-off-by: David Howells <dhowells@redhat.com> cc: Marc Dionne <marc.dionne@auristor.com> cc: linux-afs@lists.infradead.org
Diffstat (limited to 'net/rxrpc/io_thread.c')
-rw-r--r--net/rxrpc/io_thread.c319
1 files changed, 146 insertions, 173 deletions
diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c
index bc65d83fab88..19aa315eddf5 100644
--- a/net/rxrpc/io_thread.c
+++ b/net/rxrpc/io_thread.c
@@ -9,6 +9,10 @@
#include "ar-internal.h"
+static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
+ struct sockaddr_rxrpc *peer_srx,
+ struct sk_buff *skb);
+
/*
* handle data received on the local endpoint
* - may be called in interrupt context
@@ -63,45 +67,19 @@ void rxrpc_error_report(struct sock *sk)
}
/*
- * post connection-level events to the connection
- * - this includes challenges, responses, some aborts and call terminal packet
- * retransmission.
+ * Process event packets targeted at a local endpoint.
*/
-static void rxrpc_post_packet_to_conn(struct rxrpc_connection *conn,
- struct sk_buff *skb)
+static void rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
{
- _enter("%p,%p", conn, skb);
-
- rxrpc_get_skb(skb, rxrpc_skb_get_conn_work);
- skb_queue_tail(&conn->rx_queue, skb);
- rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work);
-}
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ char v;
-/*
- * post endpoint-level events to the local endpoint
- * - this includes debug and version messages
- */
-static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
- struct sk_buff *skb)
-{
- _enter("%p,%p", local, skb);
+ _enter("");
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- rxrpc_get_skb(skb, rxrpc_skb_get_local_work);
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
- }
-}
-
-/*
- * put a packet up for transport-level abort
- */
-static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
-{
- if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) {
- rxrpc_get_skb(skb, rxrpc_skb_get_reject_work);
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
+ rxrpc_see_skb(skb, rxrpc_skb_see_version);
+ if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
+ if (v == 0)
+ rxrpc_send_version_request(local, &sp->hdr, skb);
}
}
@@ -156,22 +134,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
{
struct rxrpc_connection *conn;
struct sockaddr_rxrpc peer_srx;
- struct rxrpc_channel *chan;
- struct rxrpc_call *call = NULL;
struct rxrpc_skb_priv *sp;
struct rxrpc_peer *peer = NULL;
- struct rxrpc_sock *rx = NULL;
struct sk_buff *skb = *_skb;
- unsigned int channel;
-
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
+ int ret = 0;
skb_pull(skb, sizeof(struct udphdr));
- /* The UDP protocol already released all skb resources;
- * we are free to add our own data there.
- */
sp = rxrpc_skb(skb);
/* dig out the RxRPC connection details */
@@ -186,15 +155,13 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
}
}
- if (skb->tstamp == 0)
- skb->tstamp = ktime_get_real();
trace_rxrpc_rx_packet(sp);
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_VERSION:
if (rxrpc_to_client(sp))
return 0;
- rxrpc_post_packet_to_local(local, skb);
+ rxrpc_input_version(local, skb);
return 0;
case RXRPC_PACKET_TYPE_BUSY:
@@ -259,7 +226,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
goto bad_message;
if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
- return 0; /* Unsupported address type - discard. */
+ return true; /* Unsupported address type - discard. */
if (peer_srx.transport.family != local->srx.transport.family &&
(peer_srx.transport.family == AF_INET &&
@@ -267,171 +234,172 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
peer_srx.transport.family,
local->srx.transport.family);
- return 0; /* Wrong address type - discard. */
+ return true; /* Wrong address type - discard. */
+ }
+
+ if (rxrpc_to_client(sp)) {
+ rcu_read_lock();
+ conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
+ conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
+ rcu_read_unlock();
+ if (!conn) {
+ trace_rxrpc_abort(0, "NCC", sp->hdr.cid,
+ sp->hdr.callNumber, sp->hdr.seq,
+ RXKADINCONSISTENCY, EBADMSG);
+ goto protocol_error;
+ }
+
+ ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
+ rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
+ return ret;
}
+ /* We need to look up service connections by the full protocol
+ * parameter set. We look up the peer first as an intermediate step
+ * and then the connection from the peer's tree.
+ */
rcu_read_lock();
- if (rxrpc_to_server(sp)) {
- /* Weed out packets to services we're not offering. Packets
- * that would begin a call are explicitly rejected and the rest
- * are just discarded.
- */
- rx = rcu_dereference(local->service);
- if (!rx || (sp->hdr.serviceId != rx->srx.srx_service &&
- sp->hdr.serviceId != rx->second_service)
- ) {
- rcu_read_unlock();
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA &&
- sp->hdr.seq == 1)
- goto unsupported_service;
- return 0;
- }
+ peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
+ if (!peer) {
+ rcu_read_unlock();
+ return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
}
- conn = rxrpc_find_connection_rcu(local, &peer_srx, skb, &peer);
+ conn = rxrpc_find_service_conn_rcu(peer, skb);
+ conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
if (conn) {
- if (sp->hdr.securityIndex != conn->security_ix)
- goto wrong_security;
+ rcu_read_unlock();
+ ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
+ rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
+ return ret;
+ }
- if (sp->hdr.serviceId != conn->service_id) {
- int old_id;
+ peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
+ rcu_read_unlock();
- if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
- goto reupgrade;
- old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
- sp->hdr.serviceId);
+ ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
+ rxrpc_put_peer(peer, rxrpc_peer_put_input);
+ if (ret < 0)
+ goto reject_packet;
+ return 0;
- if (old_id != conn->orig_service_id &&
- old_id != sp->hdr.serviceId)
- goto reupgrade;
- }
+bad_message:
+ trace_rxrpc_abort(0, "BAD", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
+ RX_PROTOCOL_ERROR, EBADMSG);
+protocol_error:
+ skb->priority = RX_PROTOCOL_ERROR;
+ skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
+reject_packet:
+ rxrpc_reject_packet(local, skb);
+ return ret;
+}
- if (sp->hdr.callNumber == 0) {
- /* Connection-level packet */
- _debug("CONN %p {%d}", conn, conn->debug_id);
- conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_conn_input);
- rcu_read_unlock();
- if (conn) {
- rxrpc_post_packet_to_conn(conn, skb);
- rxrpc_put_connection(conn, rxrpc_conn_put_conn_input);
- }
- return 0;
- }
+/*
+ * Deal with a packet that's associated with an extant connection.
+ */
+static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
+ struct sockaddr_rxrpc *peer_srx,
+ struct sk_buff *skb)
+{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+ struct rxrpc_channel *chan;
+ struct rxrpc_call *call = NULL;
+ unsigned int channel;
- if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
- conn->hi_serial = sp->hdr.serial;
+ if (sp->hdr.securityIndex != conn->security_ix)
+ goto wrong_security;
- /* Call-bound packets are routed by connection channel. */
- channel = sp->hdr.cid & RXRPC_CHANNELMASK;
- chan = &conn->channels[channel];
+ if (sp->hdr.serviceId != conn->service_id) {
+ int old_id;
- /* Ignore really old calls */
- if (sp->hdr.callNumber < chan->last_call) {
- rcu_read_unlock();
- return 0;
- }
+ if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
+ goto reupgrade;
+ old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
+ sp->hdr.serviceId);
- if (sp->hdr.callNumber == chan->last_call) {
- if (chan->call ||
- sp->hdr.type == RXRPC_PACKET_TYPE_ABORT) {
- rcu_read_unlock();
- return 0;
- }
+ if (old_id != conn->orig_service_id &&
+ old_id != sp->hdr.serviceId)
+ goto reupgrade;
+ }
- /* For the previous service call, if completed
- * successfully, we discard all further packets.
- */
- if (rxrpc_conn_is_service(conn) &&
- chan->last_type == RXRPC_PACKET_TYPE_ACK) {
- rcu_read_unlock();
- return 0;
- }
+ if (after(sp->hdr.serial, conn->hi_serial))
+ conn->hi_serial = sp->hdr.serial;
- /* But otherwise we need to retransmit the final packet
- * from data cached in the connection record.
- */
- if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
- trace_rxrpc_rx_data(chan->call_debug_id,
- sp->hdr.seq,
- sp->hdr.serial,
- sp->hdr.flags);
- conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
- rcu_read_unlock();
- if (conn) {
- rxrpc_post_packet_to_conn(conn, skb);
- rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
- }
+ /* It's a connection-level packet if the call number is 0. */
+ if (sp->hdr.callNumber == 0)
+ return rxrpc_input_conn_packet(conn, skb);
+
+ /* Call-bound packets are routed by connection channel. */
+ channel = sp->hdr.cid & RXRPC_CHANNELMASK;
+ chan = &conn->channels[channel];
+
+ /* Ignore really old calls */
+ if (sp->hdr.callNumber < chan->last_call)
+ return 0;
+
+ if (sp->hdr.callNumber == chan->last_call) {
+ if (chan->call ||
+ sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
return 0;
- }
- call = rcu_dereference(chan->call);
+ /* For the previous service call, if completed successfully, we
+ * discard all further packets.
+ */
+ if (rxrpc_conn_is_service(conn) &&
+ chan->last_type == RXRPC_PACKET_TYPE_ACK)
+ return 0;
- if (sp->hdr.callNumber > chan->call_id) {
- if (rxrpc_to_client(sp)) {
- rcu_read_unlock();
- goto reject_packet;
- }
- if (call) {
- rxrpc_input_implicit_end_call(conn, call);
- chan->call = NULL;
- call = NULL;
- }
- }
+ /* But otherwise we need to retransmit the final packet from
+ * data cached in the connection record.
+ */
+ if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
+ trace_rxrpc_rx_data(chan->call_debug_id,
+ sp->hdr.seq,
+ sp->hdr.serial,
+ sp->hdr.flags);
+ rxrpc_input_conn_packet(conn, skb);
+ return 0;
+ }
- if (call && !rxrpc_try_get_call(call, rxrpc_call_get_input))
- call = NULL;
+ rcu_read_lock();
+ call = rxrpc_try_get_call(rcu_dereference(chan->call),
+ rxrpc_call_get_input);
+ rcu_read_unlock();
+
+ if (sp->hdr.callNumber > chan->call_id) {
+ if (rxrpc_to_client(sp)) {
+ rxrpc_put_call(call, rxrpc_call_put_input);
+ goto reject_packet;
+ }
if (call) {
- if (sp->hdr.serviceId != call->dest_srx.srx_service)
- call->dest_srx.srx_service = sp->hdr.serviceId;
- if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
- call->rx_serial = sp->hdr.serial;
- if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
- set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
+ rxrpc_implicit_end_call(call, skb);
+ rxrpc_put_call(call, rxrpc_call_put_input);
+ call = NULL;
}
}
if (!call) {
- if (rxrpc_to_client(sp) ||
- sp->hdr.type != RXRPC_PACKET_TYPE_DATA) {
- rcu_read_unlock();
+ if (rxrpc_to_client(sp))
goto bad_message;
- }
- if (sp->hdr.seq != 1) {
- rcu_read_unlock();
+ if (rxrpc_new_incoming_call(conn->local, conn->peer, conn,
+ peer_srx, skb))
return 0;
- }
- call = rxrpc_new_incoming_call(local, rx, &peer_srx, skb);
- if (!call) {
- rcu_read_unlock();
- goto reject_packet;
- }
+ goto reject_packet;
}
- rcu_read_unlock();
-
- /* Process a call packet. */
rxrpc_input_call_event(call, skb);
rxrpc_put_call(call, rxrpc_call_put_input);
- trace_rxrpc_rx_done(0, 0);
return 0;
wrong_security:
- rcu_read_unlock();
trace_rxrpc_abort(0, "SEC", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RXKADINCONSISTENCY, EBADMSG);
skb->priority = RXKADINCONSISTENCY;
goto post_abort;
-unsupported_service:
- trace_rxrpc_abort(0, "INV", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
- RX_INVALID_OPERATION, EOPNOTSUPP);
- skb->priority = RX_INVALID_OPERATION;
- goto post_abort;
-
reupgrade:
- rcu_read_unlock();
trace_rxrpc_abort(0, "UPG", sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
RX_PROTOCOL_ERROR, EBADMSG);
goto protocol_error;
@@ -444,7 +412,7 @@ protocol_error:
post_abort:
skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
reject_packet:
- rxrpc_reject_packet(local, skb);
+ rxrpc_reject_packet(conn->local, skb);
return 0;
}
@@ -479,6 +447,11 @@ int rxrpc_io_thread(void *data)
continue;
}
+ if (!list_empty(&local->ack_tx_queue)) {
+ rxrpc_transmit_ack_packets(local);
+ continue;
+ }
+
/* Process received packets and errors. */
if ((skb = __skb_dequeue(&rx_queue))) {
switch (skb->mark) {