summaryrefslogtreecommitdiffstats
path: root/net/rxrpc
diff options
context:
space:
mode:
Diffstat (limited to 'net/rxrpc')
-rw-r--r--net/rxrpc/af_rxrpc.c9
-rw-r--r--net/rxrpc/ar-internal.h25
-rw-r--r--net/rxrpc/call_event.c23
-rw-r--r--net/rxrpc/call_object.c33
-rw-r--r--net/rxrpc/conn_client.c44
-rw-r--r--net/rxrpc/conn_event.c6
-rw-r--r--net/rxrpc/conn_object.c2
-rw-r--r--net/rxrpc/input.c359
-rw-r--r--net/rxrpc/local_event.c4
-rw-r--r--net/rxrpc/local_object.c104
-rw-r--r--net/rxrpc/output.c9
-rw-r--r--net/rxrpc/peer_event.c10
-rw-r--r--net/rxrpc/protocol.h9
-rw-r--r--net/rxrpc/recvmsg.c53
-rw-r--r--net/rxrpc/rxkad.c32
-rw-r--r--net/rxrpc/sendmsg.c13
-rw-r--r--net/rxrpc/skbuff.c40
17 files changed, 447 insertions, 328 deletions
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index d09eaf153544..d72ddb67bb74 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -193,7 +193,7 @@ static int rxrpc_bind(struct socket *sock, struct sockaddr *saddr, int len)
service_in_use:
write_unlock(&local->services_lock);
- rxrpc_put_local(local);
+ rxrpc_unuse_local(local);
ret = -EADDRINUSE;
error_unlock:
release_sock(&rx->sk);
@@ -402,7 +402,7 @@ EXPORT_SYMBOL(rxrpc_kernel_check_life);
*/
void rxrpc_kernel_probe_life(struct socket *sock, struct rxrpc_call *call)
{
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_check_life);
rxrpc_send_ack_packet(call, true, NULL);
}
@@ -862,7 +862,6 @@ static void rxrpc_sock_destructor(struct sock *sk)
static int rxrpc_release_sock(struct sock *sk)
{
struct rxrpc_sock *rx = rxrpc_sk(sk);
- struct rxrpc_net *rxnet = rxrpc_net(sock_net(&rx->sk));
_enter("%p{%d,%d}", sk, sk->sk_state, refcount_read(&sk->sk_refcnt));
@@ -898,10 +897,8 @@ static int rxrpc_release_sock(struct sock *sk)
rxrpc_release_calls_on_socket(rx);
flush_workqueue(rxrpc_workqueue);
rxrpc_purge_queue(&sk->sk_receive_queue);
- rxrpc_queue_work(&rxnet->service_conn_reaper);
- rxrpc_queue_work(&rxnet->client_conn_reaper);
- rxrpc_put_local(rx->local);
+ rxrpc_unuse_local(rx->local);
rx->local = NULL;
key_put(rx->key);
rx->key = NULL;
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 822f45386e31..8051dfdcf26d 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -185,11 +185,17 @@ struct rxrpc_host_header {
* - max 48 bytes (struct sk_buff::cb)
*/
struct rxrpc_skb_priv {
- union {
- u8 nr_jumbo; /* Number of jumbo subpackets */
- };
+ atomic_t nr_ring_pins; /* Number of rxtx ring pins */
+ u8 nr_subpackets; /* Number of subpackets */
+ u8 rx_flags; /* Received packet flags */
+#define RXRPC_SKB_INCL_LAST 0x01 /* - Includes last packet */
+#define RXRPC_SKB_TX_BUFFER 0x02 /* - Is transmit buffer */
union {
int remain; /* amount of space remaining for next write */
+
+ /* List of requested ACKs on subpackets */
+ unsigned long rx_req_ack[(RXRPC_MAX_NR_JUMBO + BITS_PER_LONG - 1) /
+ BITS_PER_LONG];
};
struct rxrpc_host_header hdr; /* RxRPC packet header from this packet */
@@ -254,7 +260,8 @@ struct rxrpc_security {
*/
struct rxrpc_local {
struct rcu_head rcu;
- atomic_t usage;
+ atomic_t active_users; /* Number of users of the local endpoint */
+ atomic_t usage; /* Number of references to the structure */
struct rxrpc_net *rxnet; /* The network ns in which this resides */
struct list_head link;
struct socket *socket; /* my UDP socket */
@@ -612,8 +619,7 @@ struct rxrpc_call {
#define RXRPC_TX_ANNO_LAST 0x04
#define RXRPC_TX_ANNO_RESENT 0x08
-#define RXRPC_RX_ANNO_JUMBO 0x3f /* Jumbo subpacket number + 1 if not zero */
-#define RXRPC_RX_ANNO_JLAST 0x40 /* Set if last element of a jumbo packet */
+#define RXRPC_RX_ANNO_SUBPACKET 0x3f /* Subpacket number in jumbogram */
#define RXRPC_RX_ANNO_VERIFIED 0x80 /* Set if verified and decrypted */
rxrpc_seq_t tx_hard_ack; /* Dead slot in buffer; the first transmitted but
* not hard-ACK'd packet follows this.
@@ -649,7 +655,6 @@ struct rxrpc_call {
/* receive-phase ACK management */
u8 ackr_reason; /* reason to ACK */
- u16 ackr_skew; /* skew on packet being ACK'd */
rxrpc_serial_t ackr_serial; /* serial of packet being ACK'd */
rxrpc_serial_t ackr_first_seq; /* first sequence number received */
rxrpc_seq_t ackr_prev_seq; /* previous sequence number received */
@@ -743,7 +748,7 @@ int rxrpc_reject_call(struct rxrpc_sock *);
/*
* call_event.c
*/
-void rxrpc_propose_ACK(struct rxrpc_call *, u8, u16, u32, bool, bool,
+void rxrpc_propose_ACK(struct rxrpc_call *, u8, u32, bool, bool,
enum rxrpc_propose_ack_trace);
void rxrpc_process_call(struct work_struct *);
@@ -905,6 +910,7 @@ void rxrpc_disconnect_client_call(struct rxrpc_call *);
void rxrpc_put_client_conn(struct rxrpc_connection *);
void rxrpc_discard_expired_client_conns(struct work_struct *);
void rxrpc_destroy_all_client_connections(struct rxrpc_net *);
+void rxrpc_clean_up_local_conns(struct rxrpc_local *);
/*
* conn_event.c
@@ -1002,6 +1008,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *, const struct sockaddr_rxrpc
struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *);
struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *);
void rxrpc_put_local(struct rxrpc_local *);
+struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *);
+void rxrpc_unuse_local(struct rxrpc_local *);
void rxrpc_queue_local(struct rxrpc_local *);
void rxrpc_destroy_all_locals(struct rxrpc_net *);
@@ -1103,6 +1111,7 @@ void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
void rxrpc_packet_destructor(struct sk_buff *);
void rxrpc_new_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_see_skb(struct sk_buff *, enum rxrpc_skb_trace);
+void rxrpc_eaten_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_get_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_free_skb(struct sk_buff *, enum rxrpc_skb_trace);
void rxrpc_purge_queue(struct sk_buff_head *);
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index bc2adeb3acb9..cedbbb3a7c2e 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -43,8 +43,7 @@ static void rxrpc_propose_ping(struct rxrpc_call *call,
* propose an ACK be sent
*/
static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
- u16 skew, u32 serial, bool immediate,
- bool background,
+ u32 serial, bool immediate, bool background,
enum rxrpc_propose_ack_trace why)
{
enum rxrpc_propose_ack_outcome outcome = rxrpc_propose_ack_use;
@@ -69,14 +68,12 @@ static void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
if (RXRPC_ACK_UPDATEABLE & (1 << ack_reason)) {
outcome = rxrpc_propose_ack_update;
call->ackr_serial = serial;
- call->ackr_skew = skew;
}
if (!immediate)
goto trace;
} else if (prior > rxrpc_ack_priority[call->ackr_reason]) {
call->ackr_reason = ack_reason;
call->ackr_serial = serial;
- call->ackr_skew = skew;
} else {
outcome = rxrpc_propose_ack_subsume;
}
@@ -137,11 +134,11 @@ trace:
* propose an ACK be sent, locking the call structure
*/
void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
- u16 skew, u32 serial, bool immediate, bool background,
+ u32 serial, bool immediate, bool background,
enum rxrpc_propose_ack_trace why)
{
spin_lock_bh(&call->lock);
- __rxrpc_propose_ACK(call, ack_reason, skew, serial,
+ __rxrpc_propose_ACK(call, ack_reason, serial,
immediate, background, why);
spin_unlock_bh(&call->lock);
}
@@ -202,7 +199,7 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
continue;
skb = call->rxtx_buffer[ix];
- rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
if (anno_type == RXRPC_TX_ANNO_UNACK) {
if (ktime_after(skb->tstamp, max_age)) {
@@ -239,7 +236,7 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
ack_ts = ktime_sub(now, call->acks_latest_ts);
if (ktime_to_ns(ack_ts) < call->peer->rtt)
goto out;
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
rxrpc_send_ack_packet(call, true, NULL);
goto out;
@@ -258,18 +255,18 @@ static void rxrpc_resend(struct rxrpc_call *call, unsigned long now_j)
continue;
skb = call->rxtx_buffer[ix];
- rxrpc_get_skb(skb, rxrpc_skb_tx_got);
+ rxrpc_get_skb(skb, rxrpc_skb_got);
spin_unlock_bh(&call->lock);
if (rxrpc_send_data_packet(call, skb, true) < 0) {
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
return;
}
if (rxrpc_is_client_call(call))
rxrpc_expose_client_call(call);
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
spin_lock_bh(&call->lock);
/* We need to clear the retransmit state, but there are two
@@ -372,7 +369,7 @@ recheck_state:
if (time_after_eq(now, t)) {
trace_rxrpc_timer(call, rxrpc_timer_exp_keepalive, now);
cmpxchg(&call->keepalive_at, t, now + MAX_JIFFY_OFFSET);
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, true,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, true,
rxrpc_propose_ack_ping_for_keepalive);
set_bit(RXRPC_CALL_EV_PING, &call->events);
}
@@ -407,7 +404,7 @@ recheck_state:
send_ack = NULL;
if (test_and_clear_bit(RXRPC_CALL_EV_ACK_LOST, &call->events)) {
call->acks_lost_top = call->tx_top;
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, 0, true, false,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, 0, true, false,
rxrpc_propose_ack_ping_for_lost_ack);
send_ack = &call->acks_lost_ping;
}
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 217b12be9e08..014548c259ce 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -422,6 +422,19 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace op)
}
/*
+ * Clean up the RxTx skb ring.
+ */
+static void rxrpc_cleanup_ring(struct rxrpc_call *call)
+{
+ int i;
+
+ for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
+ rxrpc_free_skb(call->rxtx_buffer[i], rxrpc_skb_cleaned);
+ call->rxtx_buffer[i] = NULL;
+ }
+}
+
+/*
* Detach a call from its owning socket.
*/
void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
@@ -429,7 +442,6 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
const void *here = __builtin_return_address(0);
struct rxrpc_connection *conn = call->conn;
bool put = false;
- int i;
_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
@@ -479,13 +491,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
if (conn)
rxrpc_disconnect_call(call);
- for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
- rxrpc_free_skb(call->rxtx_buffer[i],
- (call->tx_phase ? rxrpc_skb_tx_cleaned :
- rxrpc_skb_rx_cleaned));
- call->rxtx_buffer[i] = NULL;
- }
-
+ rxrpc_cleanup_ring(call);
_leave("");
}
@@ -568,8 +574,6 @@ static void rxrpc_rcu_destroy_call(struct rcu_head *rcu)
*/
void rxrpc_cleanup_call(struct rxrpc_call *call)
{
- int i;
-
_net("DESTROY CALL %d", call->debug_id);
memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
@@ -580,13 +584,8 @@ void rxrpc_cleanup_call(struct rxrpc_call *call)
ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
ASSERTCMP(call->conn, ==, NULL);
- /* Clean up the Rx/Tx buffer */
- for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++)
- rxrpc_free_skb(call->rxtx_buffer[i],
- (call->tx_phase ? rxrpc_skb_tx_cleaned :
- rxrpc_skb_rx_cleaned));
-
- rxrpc_free_skb(call->tx_pending, rxrpc_skb_tx_cleaned);
+ rxrpc_cleanup_ring(call);
+ rxrpc_free_skb(call->tx_pending, rxrpc_skb_cleaned);
call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
}
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index aea82f909c60..3f1da1b49f69 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -1162,3 +1162,47 @@ void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
_leave("");
}
+
+/*
+ * Clean up the client connections on a local endpoint.
+ */
+void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
+{
+ struct rxrpc_connection *conn, *tmp;
+ struct rxrpc_net *rxnet = local->rxnet;
+ unsigned int nr_active;
+ LIST_HEAD(graveyard);
+
+ _enter("");
+
+ spin_lock(&rxnet->client_conn_cache_lock);
+ nr_active = rxnet->nr_active_client_conns;
+
+ list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
+ cache_link) {
+ if (conn->params.local == local) {
+ ASSERTCMP(conn->cache_state, ==, RXRPC_CONN_CLIENT_IDLE);
+
+ trace_rxrpc_client(conn, -1, rxrpc_client_discard);
+ if (!test_and_clear_bit(RXRPC_CONN_EXPOSED, &conn->flags))
+ BUG();
+ conn->cache_state = RXRPC_CONN_CLIENT_INACTIVE;
+ list_move(&conn->cache_link, &graveyard);
+ nr_active--;
+ }
+ }
+
+ rxnet->nr_active_client_conns = nr_active;
+ spin_unlock(&rxnet->client_conn_cache_lock);
+ ASSERTCMP(nr_active, >=, 0);
+
+ while (!list_empty(&graveyard)) {
+ conn = list_entry(graveyard.next,
+ struct rxrpc_connection, cache_link);
+ list_del_init(&conn->cache_link);
+
+ rxrpc_put_connection(conn);
+ }
+
+ _leave(" [culled]");
+}
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index df6624c140be..a1ceef4f5cd0 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -472,7 +472,7 @@ void rxrpc_process_connection(struct work_struct *work)
/* go through the conn-level event packets, releasing the ref on this
* connection that each one has when we've finished with it */
while ((skb = skb_dequeue(&conn->rx_queue))) {
- rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
ret = rxrpc_process_event(conn, skb, &abort_code);
switch (ret) {
case -EPROTO:
@@ -484,7 +484,7 @@ void rxrpc_process_connection(struct work_struct *work)
goto requeue_and_leave;
case -ECONNABORTED:
default:
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
break;
}
}
@@ -501,6 +501,6 @@ requeue_and_leave:
protocol_error:
if (rxrpc_abort_connection(conn, ret, abort_code) < 0)
goto requeue_and_leave;
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
goto out;
}
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 434ef392212b..ed05b6922132 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -398,7 +398,7 @@ void rxrpc_service_connection_reaper(struct work_struct *work)
if (conn->state == RXRPC_CONN_SERVICE_PREALLOC)
continue;
- if (rxnet->live) {
+ if (rxnet->live && !conn->params.local->dead) {
idle_timestamp = READ_ONCE(conn->idle_timestamp);
expire_at = idle_timestamp + rxrpc_connection_expiry * HZ;
if (conn->params.local->service_closed)
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 5bd6f1546e5c..d122c53c8697 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -196,15 +196,14 @@ send_extra_data:
* Ping the other end to fill our RTT cache and to retrieve the rwind
* and MTU parameters.
*/
-static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb,
- int skew)
+static void rxrpc_send_ping(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
ktime_t now = skb->tstamp;
if (call->peer->rtt_usage < 3 ||
ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), now))
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
true, true,
rxrpc_propose_ack_ping_for_params);
}
@@ -234,7 +233,7 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
ix = call->tx_hard_ack & RXRPC_RXTX_BUFF_MASK;
skb = call->rxtx_buffer[ix];
annotation = call->rxtx_annotations[ix];
- rxrpc_see_skb(skb, rxrpc_skb_tx_rotated);
+ rxrpc_see_skb(skb, rxrpc_skb_rotated);
call->rxtx_buffer[ix] = NULL;
call->rxtx_annotations[ix] = 0;
skb->next = list;
@@ -259,7 +258,7 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
skb = list;
list = skb->next;
skb_mark_not_on_list(skb);
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
}
return rot_last;
@@ -348,7 +347,7 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
}
/*
- * Scan a jumbo packet to validate its structure and to work out how many
+ * Scan a data packet to validate its structure and to work out how many
* subpackets it contains.
*
* A jumbo packet is a collection of consecutive packets glued together with
@@ -359,16 +358,21 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
* the last are RXRPC_JUMBO_DATALEN in size. The last subpacket may be of any
* size.
*/
-static bool rxrpc_validate_jumbo(struct sk_buff *skb)
+static bool rxrpc_validate_data(struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int offset = sizeof(struct rxrpc_wire_header);
unsigned int len = skb->len;
- int nr_jumbo = 1;
u8 flags = sp->hdr.flags;
- do {
- nr_jumbo++;
+ for (;;) {
+ if (flags & RXRPC_REQUEST_ACK)
+ __set_bit(sp->nr_subpackets, sp->rx_req_ack);
+ sp->nr_subpackets++;
+
+ if (!(flags & RXRPC_JUMBO_PACKET))
+ break;
+
if (len - offset < RXRPC_JUMBO_SUBPKTLEN)
goto protocol_error;
if (flags & RXRPC_LAST_PACKET)
@@ -377,9 +381,10 @@ static bool rxrpc_validate_jumbo(struct sk_buff *skb)
if (skb_copy_bits(skb, offset, &flags, 1) < 0)
goto protocol_error;
offset += sizeof(struct rxrpc_jumbo_header);
- } while (flags & RXRPC_JUMBO_PACKET);
+ }
- sp->nr_jumbo = nr_jumbo;
+ if (flags & RXRPC_LAST_PACKET)
+ sp->rx_flags |= RXRPC_SKB_INCL_LAST;
return true;
protocol_error:
@@ -400,10 +405,10 @@ protocol_error:
* (that information is encoded in the ACK packet).
*/
static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
- u8 annotation, bool *_jumbo_bad)
+ bool is_jumbo, bool *_jumbo_bad)
{
/* Discard normal packets that are duplicates. */
- if (annotation == 0)
+ if (is_jumbo)
return;
/* Skip jumbo subpackets that are duplicates. When we've had three or
@@ -417,30 +422,30 @@ static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
}
/*
- * Process a DATA packet, adding the packet to the Rx ring.
+ * Process a DATA packet, adding the packet to the Rx ring. The caller's
+ * packet ref must be passed on or discarded.
*/
-static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
- u16 skew)
+static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
enum rxrpc_call_state state;
- unsigned int offset = sizeof(struct rxrpc_wire_header);
- unsigned int ix;
+ unsigned int j;
rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
- rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
- bool immediate_ack = false, jumbo_bad = false, queued;
- u16 len;
- u8 ack = 0, flags, annotation = 0;
+ rxrpc_seq_t seq0 = sp->hdr.seq, hard_ack;
+ bool immediate_ack = false, jumbo_bad = false;
+ u8 ack = 0;
_enter("{%u,%u},{%u,%u}",
- call->rx_hard_ack, call->rx_top, skb->len, seq);
+ call->rx_hard_ack, call->rx_top, skb->len, seq0);
- _proto("Rx DATA %%%u { #%u f=%02x }",
- sp->hdr.serial, seq, sp->hdr.flags);
+ _proto("Rx DATA %%%u { #%u f=%02x n=%u }",
+ sp->hdr.serial, seq0, sp->hdr.flags, sp->nr_subpackets);
state = READ_ONCE(call->state);
- if (state >= RXRPC_CALL_COMPLETE)
+ if (state >= RXRPC_CALL_COMPLETE) {
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
return;
+ }
if (call->state == RXRPC_CALL_SERVER_RECV_REQUEST) {
unsigned long timo = READ_ONCE(call->next_req_timo);
@@ -465,156 +470,157 @@ static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb,
!rxrpc_receiving_reply(call))
goto unlock;
- call->ackr_prev_seq = seq;
-
+ call->ackr_prev_seq = seq0;
hard_ack = READ_ONCE(call->rx_hard_ack);
- if (after(seq, hard_ack + call->rx_winsize)) {
- ack = RXRPC_ACK_EXCEEDS_WINDOW;
- ack_serial = serial;
- goto ack;
- }
- flags = sp->hdr.flags;
- if (flags & RXRPC_JUMBO_PACKET) {
+ if (sp->nr_subpackets > 1) {
if (call->nr_jumbo_bad > 3) {
ack = RXRPC_ACK_NOSPACE;
ack_serial = serial;
goto ack;
}
- annotation = 1;
}
-next_subpacket:
- queued = false;
- ix = seq & RXRPC_RXTX_BUFF_MASK;
- len = skb->len;
- if (flags & RXRPC_JUMBO_PACKET)
- len = RXRPC_JUMBO_DATALEN;
-
- if (flags & RXRPC_LAST_PACKET) {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- seq != call->rx_top) {
- rxrpc_proto_abort("LSN", call, seq);
- goto unlock;
- }
- } else {
- if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
- after_eq(seq, call->rx_top)) {
- rxrpc_proto_abort("LSA", call, seq);
- goto unlock;
+ for (j = 0; j < sp->nr_subpackets; j++) {
+ rxrpc_serial_t serial = sp->hdr.serial + j;
+ rxrpc_seq_t seq = seq0 + j;
+ unsigned int ix = seq & RXRPC_RXTX_BUFF_MASK;
+ bool terminal = (j == sp->nr_subpackets - 1);
+ bool last = terminal && (sp->rx_flags & RXRPC_SKB_INCL_LAST);
+ u8 flags, annotation = j;
+
+ _proto("Rx DATA+%u %%%u { #%x t=%u l=%u }",
+ j, serial, seq, terminal, last);
+
+ if (last) {
+ if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ seq != call->rx_top) {
+ rxrpc_proto_abort("LSN", call, seq);
+ goto unlock;
+ }
+ } else {
+ if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
+ after_eq(seq, call->rx_top)) {
+ rxrpc_proto_abort("LSA", call, seq);
+ goto unlock;
+ }
}
- }
-
- trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
- if (before_eq(seq, hard_ack)) {
- ack = RXRPC_ACK_DUPLICATE;
- ack_serial = serial;
- goto skip;
- }
- if (flags & RXRPC_REQUEST_ACK && !ack) {
- ack = RXRPC_ACK_REQUESTED;
- ack_serial = serial;
- }
+ flags = 0;
+ if (last)
+ flags |= RXRPC_LAST_PACKET;
+ if (!terminal)
+ flags |= RXRPC_JUMBO_PACKET;
+ if (test_bit(j, sp->rx_req_ack))
+ flags |= RXRPC_REQUEST_ACK;
+ trace_rxrpc_rx_data(call->debug_id, seq, serial, flags, annotation);
- if (call->rxtx_buffer[ix]) {
- rxrpc_input_dup_data(call, seq, annotation, &jumbo_bad);
- if (ack != RXRPC_ACK_DUPLICATE) {
+ if (before_eq(seq, hard_ack)) {
ack = RXRPC_ACK_DUPLICATE;
ack_serial = serial;
+ continue;
}
- immediate_ack = true;
- goto skip;
- }
-
- /* Queue the packet. We use a couple of memory barriers here as need
- * to make sure that rx_top is perceived to be set after the buffer
- * pointer and that the buffer pointer is set after the annotation and
- * the skb data.
- *
- * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
- * and also rxrpc_fill_out_ack().
- */
- rxrpc_get_skb(skb, rxrpc_skb_rx_got);
- call->rxtx_annotations[ix] = annotation;
- smp_wmb();
- call->rxtx_buffer[ix] = skb;
- if (after(seq, call->rx_top)) {
- smp_store_release(&call->rx_top, seq);
- } else if (before(seq, call->rx_top)) {
- /* Send an immediate ACK if we fill in a hole */
- if (!ack) {
- ack = RXRPC_ACK_DELAY;
- ack_serial = serial;
- }
- immediate_ack = true;
- }
- if (flags & RXRPC_LAST_PACKET) {
- set_bit(RXRPC_CALL_RX_LAST, &call->flags);
- trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
- } else {
- trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
- }
- queued = true;
- if (after_eq(seq, call->rx_expect_next)) {
- if (after(seq, call->rx_expect_next)) {
- _net("OOS %u > %u", seq, call->rx_expect_next);
- ack = RXRPC_ACK_OUT_OF_SEQUENCE;
- ack_serial = serial;
+ if (call->rxtx_buffer[ix]) {
+ rxrpc_input_dup_data(call, seq, sp->nr_subpackets > 1,
+ &jumbo_bad);
+ if (ack != RXRPC_ACK_DUPLICATE) {
+ ack = RXRPC_ACK_DUPLICATE;
+ ack_serial = serial;
+ }
+ immediate_ack = true;
+ continue;
}
- call->rx_expect_next = seq + 1;
- }
-skip:
- offset += len;
- if (flags & RXRPC_JUMBO_PACKET) {
- if (skb_copy_bits(skb, offset, &flags, 1) < 0) {
- rxrpc_proto_abort("XJF", call, seq);
- goto unlock;
- }
- offset += sizeof(struct rxrpc_jumbo_header);
- seq++;
- serial++;
- annotation++;
- if (flags & RXRPC_JUMBO_PACKET)
- annotation |= RXRPC_RX_ANNO_JLAST;
if (after(seq, hard_ack + call->rx_winsize)) {
ack = RXRPC_ACK_EXCEEDS_WINDOW;
ack_serial = serial;
- if (!jumbo_bad) {
- call->nr_jumbo_bad++;
- jumbo_bad = true;
+ if (flags & RXRPC_JUMBO_PACKET) {
+ if (!jumbo_bad) {
+ call->nr_jumbo_bad++;
+ jumbo_bad = true;
+ }
}
+
goto ack;
}
- _proto("Rx DATA Jumbo %%%u", serial);
- goto next_subpacket;
- }
+ if (flags & RXRPC_REQUEST_ACK && !ack) {
+ ack = RXRPC_ACK_REQUESTED;
+ ack_serial = serial;
+ }
+
+ /* Queue the packet. We use a couple of memory barriers here as need
+ * to make sure that rx_top is perceived to be set after the buffer
+ * pointer and that the buffer pointer is set after the annotation and
+ * the skb data.
+ *
+ * Barriers against rxrpc_recvmsg_data() and rxrpc_rotate_rx_window()
+ * and also rxrpc_fill_out_ack().
+ */
+ if (!terminal)
+ rxrpc_get_skb(skb, rxrpc_skb_got);
+ call->rxtx_annotations[ix] = annotation;
+ smp_wmb();
+ call->rxtx_buffer[ix] = skb;
+ if (after(seq, call->rx_top)) {
+ smp_store_release(&call->rx_top, seq);
+ } else if (before(seq, call->rx_top)) {
+ /* Send an immediate ACK if we fill in a hole */
+ if (!ack) {
+ ack = RXRPC_ACK_DELAY;
+ ack_serial = serial;
+ }
+ immediate_ack = true;
+ }
- if (queued && flags & RXRPC_LAST_PACKET && !ack) {
- ack = RXRPC_ACK_DELAY;
- ack_serial = serial;
+ if (terminal) {
+ /* From this point on, we're not allowed to touch the
+ * packet any longer as its ref now belongs to the Rx
+ * ring.
+ */
+ skb = NULL;
+ }
+
+ if (last) {
+ set_bit(RXRPC_CALL_RX_LAST, &call->flags);
+ if (!ack) {
+ ack = RXRPC_ACK_DELAY;
+ ack_serial = serial;
+ }
+ trace_rxrpc_receive(call, rxrpc_receive_queue_last, serial, seq);
+ } else {
+ trace_rxrpc_receive(call, rxrpc_receive_queue, serial, seq);
+ }
+
+ if (after_eq(seq, call->rx_expect_next)) {
+ if (after(seq, call->rx_expect_next)) {
+ _net("OOS %u > %u", seq, call->rx_expect_next);
+ ack = RXRPC_ACK_OUT_OF_SEQUENCE;
+ ack_serial = serial;
+ }
+ call->rx_expect_next = seq + 1;
+ }
}
ack:
if (ack)
- rxrpc_propose_ACK(call, ack, skew, ack_serial,
+ rxrpc_propose_ACK(call, ack, ack_serial,
immediate_ack, true,
rxrpc_propose_ack_input_data);
else
- rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, skew, serial,
+ rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
false, true,
rxrpc_propose_ack_input_data);
- if (sp->hdr.seq == READ_ONCE(call->rx_hard_ack) + 1) {
+ if (seq0 == READ_ONCE(call->rx_hard_ack) + 1) {
trace_rxrpc_notify_socket(call->debug_id, serial);
rxrpc_notify_socket(call);
}
unlock:
spin_unlock(&call->input_lock);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave(" [queued]");
}
@@ -822,8 +828,7 @@ static void rxrpc_input_soft_acks(struct rxrpc_call *call, u8 *acks,
* soft-ACK means that the packet may be discarded and retransmission
* requested. A phase is complete when all packets are hard-ACK'd.
*/
-static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
- u16 skew)
+static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
{
struct rxrpc_ack_summary summary = { 0 };
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
@@ -867,11 +872,11 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
if (buf.ack.reason == RXRPC_ACK_PING) {
_proto("Rx ACK %%%u PING Request", sp->hdr.serial);
rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
- skew, sp->hdr.serial, true, true,
+ sp->hdr.serial, true, true,
rxrpc_propose_ack_respond_to_ping);
} else if (sp->hdr.flags & RXRPC_REQUEST_ACK) {
rxrpc_propose_ACK(call, RXRPC_ACK_REQUESTED,
- skew, sp->hdr.serial, true, true,
+ sp->hdr.serial, true, true,
rxrpc_propose_ack_respond_to_ack);
}
@@ -948,7 +953,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb,
RXRPC_TX_ANNO_LAST &&
summary.nr_acks == call->tx_top - hard_ack &&
rxrpc_is_client_call(call))
- rxrpc_propose_ACK(call, RXRPC_ACK_PING, skew, sp->hdr.serial,
+ rxrpc_propose_ACK(call, RXRPC_ACK_PING, sp->hdr.serial,
false, true,
rxrpc_propose_ack_ping_for_lost_reply);
@@ -1004,7 +1009,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
* Process an incoming call packet.
*/
static void rxrpc_input_call_packet(struct rxrpc_call *call,
- struct sk_buff *skb, u16 skew)
+ struct sk_buff *skb)
{
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned long timo;
@@ -1023,11 +1028,11 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
switch (sp->hdr.type) {
case RXRPC_PACKET_TYPE_DATA:
- rxrpc_input_data(call, skb, skew);
- break;
+ rxrpc_input_data(call, skb);
+ goto no_free;
case RXRPC_PACKET_TYPE_ACK:
- rxrpc_input_ack(call, skb, skew);
+ rxrpc_input_ack(call, skb);
break;
case RXRPC_PACKET_TYPE_BUSY:
@@ -1051,6 +1056,8 @@ static void rxrpc_input_call_packet(struct rxrpc_call *call,
break;
}
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+no_free:
_leave("");
}
@@ -1108,8 +1115,12 @@ static void rxrpc_post_packet_to_local(struct rxrpc_local *local,
{
_enter("%p,%p", local, skb);
- skb_queue_tail(&local->event_queue, skb);
- rxrpc_queue_local(local);
+ if (rxrpc_get_local_maybe(local)) {
+ skb_queue_tail(&local->event_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ }
}
/*
@@ -1119,8 +1130,12 @@ static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb)
{
CHECK_SLAB_OKAY(&local->usage);
- skb_queue_tail(&local->reject_queue, skb);
- rxrpc_queue_local(local);
+ if (rxrpc_get_local_maybe(local)) {
+ skb_queue_tail(&local->reject_queue, skb);
+ rxrpc_queue_local(local);
+ } else {
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
+ }
}
/*
@@ -1173,7 +1188,6 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
struct rxrpc_peer *peer = NULL;
struct rxrpc_sock *rx = NULL;
unsigned int channel;
- int skew = 0;
_enter("%p", udp_sk);
@@ -1184,7 +1198,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if (skb->tstamp == 0)
skb->tstamp = ktime_get_real();
- rxrpc_new_skb(skb, rxrpc_skb_rx_received);
+ rxrpc_new_skb(skb, rxrpc_skb_received);
skb_pull(skb, sizeof(struct udphdr));
@@ -1201,7 +1215,7 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
static int lose;
if ((lose++ & 7) == 7) {
trace_rxrpc_rx_lose(sp);
- rxrpc_free_skb(skb, rxrpc_skb_rx_lost);
+ rxrpc_free_skb(skb, rxrpc_skb_lost);
return 0;
}
}
@@ -1233,9 +1247,26 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
if (sp->hdr.callNumber == 0 ||
sp->hdr.seq == 0)
goto bad_message;
- if (sp->hdr.flags & RXRPC_JUMBO_PACKET &&
- !rxrpc_validate_jumbo(skb))
+ if (!rxrpc_validate_data(skb))
goto bad_message;
+
+ /* Unshare the packet so that it can be modified for in-place
+ * decryption.
+ */
+ if (sp->hdr.securityIndex != 0) {
+ struct sk_buff *nskb = skb_unshare(skb, GFP_ATOMIC);
+ if (!nskb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_unshared_nomem);
+ goto out;
+ }
+
+ if (nskb != skb) {
+ rxrpc_eaten_skb(skb, rxrpc_skb_received);
+ rxrpc_new_skb(skb, rxrpc_skb_unshared);
+ skb = nskb;
+ sp = rxrpc_skb(skb);
+ }
+ }
break;
case RXRPC_PACKET_TYPE_CHALLENGE:
@@ -1301,15 +1332,8 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
goto out;
}
- /* Note the serial number skew here */
- skew = (int)sp->hdr.serial - (int)conn->hi_serial;
- if (skew >= 0) {
- if (skew > 0)
- conn->hi_serial = sp->hdr.serial;
- } else {
- skew = -skew;
- skew = min(skew, 65535);
- }
+ if ((int)sp->hdr.serial - (int)conn->hi_serial > 0)
+ conn->hi_serial = sp->hdr.serial;
/* Call-bound packets are routed by connection channel. */
channel = sp->hdr.cid & RXRPC_CHANNELMASK;
@@ -1372,15 +1396,18 @@ int rxrpc_input_packet(struct sock *udp_sk, struct sk_buff *skb)
call = rxrpc_new_incoming_call(local, rx, skb);
if (!call)
goto reject_packet;
- rxrpc_send_ping(call, skb, skew);
+ rxrpc_send_ping(call, skb);
mutex_unlock(&call->user_mutex);
}
- rxrpc_input_call_packet(call, skb, skew);
- goto discard;
+ /* Process a call packet; this either discards or passes on the ref
+ * elsewhere.
+ */
+ rxrpc_input_call_packet(call, skb);
+ goto out;
discard:
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
out:
trace_rxrpc_rx_done(0, 0);
return 0;
diff --git a/net/rxrpc/local_event.c b/net/rxrpc/local_event.c
index e93a78f7c05e..3ce6d628cd75 100644
--- a/net/rxrpc/local_event.c
+++ b/net/rxrpc/local_event.c
@@ -90,7 +90,7 @@ void rxrpc_process_local_events(struct rxrpc_local *local)
if (skb) {
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
- rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
_debug("{%d},{%u}", local->debug_id, sp->hdr.type);
switch (sp->hdr.type) {
@@ -108,7 +108,7 @@ void rxrpc_process_local_events(struct rxrpc_local *local)
break;
}
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
}
_leave("");
diff --git a/net/rxrpc/local_object.c b/net/rxrpc/local_object.c
index b1c71bad510b..36587260cabd 100644
--- a/net/rxrpc/local_object.c
+++ b/net/rxrpc/local_object.c
@@ -79,6 +79,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
local = kzalloc(sizeof(struct rxrpc_local), GFP_KERNEL);
if (local) {
atomic_set(&local->usage, 1);
+ atomic_set(&local->active_users, 1);
local->rxnet = rxnet;
INIT_LIST_HEAD(&local->link);
INIT_WORK(&local->processor, rxrpc_local_processor);
@@ -92,7 +93,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
local->debug_id = atomic_inc_return(&rxrpc_debug_id);
memcpy(&local->srx, srx, sizeof(*srx));
local->srx.srx_service = 0;
- trace_rxrpc_local(local, rxrpc_local_new, 1, NULL);
+ trace_rxrpc_local(local->debug_id, rxrpc_local_new, 1, NULL);
}
_leave(" = %p", local);
@@ -266,11 +267,8 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
* bind the transport socket may still fail if we're attempting
* to use a local address that the dying object is still using.
*/
- if (!rxrpc_get_local_maybe(local)) {
- cursor = cursor->next;
- list_del_init(&local->link);
+ if (!rxrpc_use_local(local))
break;
- }
age = "old";
goto found;
@@ -284,7 +282,10 @@ struct rxrpc_local *rxrpc_lookup_local(struct net *net,
if (ret < 0)
goto sock_error;
- list_add_tail(&local->link, cursor);
+ if (cursor != &rxnet->local_endpoints)
+ list_replace_init(cursor, &local->link);
+ else
+ list_add_tail(&local->link, cursor);
age = "new";
found:
@@ -320,7 +321,7 @@ struct rxrpc_local *rxrpc_get_local(struct rxrpc_local *local)
int n;
n = atomic_inc_return(&local->usage);
- trace_rxrpc_local(local, rxrpc_local_got, n, here);
+ trace_rxrpc_local(local->debug_id, rxrpc_local_got, n, here);
return local;
}
@@ -334,7 +335,8 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
if (local) {
int n = atomic_fetch_add_unless(&local->usage, 1, 0);
if (n > 0)
- trace_rxrpc_local(local, rxrpc_local_got, n + 1, here);
+ trace_rxrpc_local(local->debug_id, rxrpc_local_got,
+ n + 1, here);
else
local = NULL;
}
@@ -342,24 +344,18 @@ struct rxrpc_local *rxrpc_get_local_maybe(struct rxrpc_local *local)
}
/*
- * Queue a local endpoint.
+ * Queue a local endpoint and pass the caller's reference to the work item.
*/
void rxrpc_queue_local(struct rxrpc_local *local)
{
const void *here = __builtin_return_address(0);
+ unsigned int debug_id = local->debug_id;
+ int n = atomic_read(&local->usage);
if (rxrpc_queue_work(&local->processor))
- trace_rxrpc_local(local, rxrpc_local_queued,
- atomic_read(&local->usage), here);
-}
-
-/*
- * A local endpoint reached its end of life.
- */
-static void __rxrpc_put_local(struct rxrpc_local *local)
-{
- _enter("%d", local->debug_id);
- rxrpc_queue_work(&local->processor);
+ trace_rxrpc_local(debug_id, rxrpc_local_queued, n, here);
+ else
+ rxrpc_put_local(local);
}
/*
@@ -372,10 +368,47 @@ void rxrpc_put_local(struct rxrpc_local *local)
if (local) {
n = atomic_dec_return(&local->usage);
- trace_rxrpc_local(local, rxrpc_local_put, n, here);
+ trace_rxrpc_local(local->debug_id, rxrpc_local_put, n, here);
if (n == 0)
- __rxrpc_put_local(local);
+ call_rcu(&local->rcu, rxrpc_local_rcu);
+ }
+}
+
+/*
+ * Start using a local endpoint.
+ */
+struct rxrpc_local *rxrpc_use_local(struct rxrpc_local *local)
+{
+ unsigned int au;
+
+ local = rxrpc_get_local_maybe(local);
+ if (!local)
+ return NULL;
+
+ au = atomic_fetch_add_unless(&local->active_users, 1, 0);
+ if (au == 0) {
+ rxrpc_put_local(local);
+ return NULL;
+ }
+
+ return local;
+}
+
+/*
+ * Cease using a local endpoint. Once the number of active users reaches 0, we
+ * start the closure of the transport in the work processor.
+ */
+void rxrpc_unuse_local(struct rxrpc_local *local)
+{
+ unsigned int au;
+
+ if (local) {
+ au = atomic_dec_return(&local->active_users);
+ if (au == 0)
+ rxrpc_queue_local(local);
+ else
+ rxrpc_put_local(local);
}
}
@@ -393,21 +426,14 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
_enter("%d", local->debug_id);
- /* We can get a race between an incoming call packet queueing the
- * processor again and the work processor starting the destruction
- * process which will shut down the UDP socket.
- */
- if (local->dead) {
- _leave(" [already dead]");
- return;
- }
local->dead = true;
mutex_lock(&rxnet->local_mutex);
list_del_init(&local->link);
mutex_unlock(&rxnet->local_mutex);
- ASSERT(RB_EMPTY_ROOT(&local->client_conns));
+ rxrpc_clean_up_local_conns(local);
+ rxrpc_service_connection_reaper(&rxnet->service_conn_reaper);
ASSERT(!local->service);
if (socket) {
@@ -422,13 +448,11 @@ static void rxrpc_local_destroyer(struct rxrpc_local *local)
*/
rxrpc_purge_queue(&local->reject_queue);
rxrpc_purge_queue(&local->event_queue);
-
- _debug("rcu local %d", local->debug_id);
- call_rcu(&local->rcu, rxrpc_local_rcu);
}
/*
- * Process events on an endpoint
+ * Process events on an endpoint. The work item carries a ref which
+ * we must release.
*/
static void rxrpc_local_processor(struct work_struct *work)
{
@@ -436,13 +460,15 @@ static void rxrpc_local_processor(struct work_struct *work)
container_of(work, struct rxrpc_local, processor);
bool again;
- trace_rxrpc_local(local, rxrpc_local_processing,
+ trace_rxrpc_local(local->debug_id, rxrpc_local_processing,
atomic_read(&local->usage), NULL);
do {
again = false;
- if (atomic_read(&local->usage) == 0)
- return rxrpc_local_destroyer(local);
+ if (atomic_read(&local->active_users) == 0) {
+ rxrpc_local_destroyer(local);
+ break;
+ }
if (!skb_queue_empty(&local->reject_queue)) {
rxrpc_reject_packets(local);
@@ -454,6 +480,8 @@ static void rxrpc_local_processor(struct work_struct *work)
again = true;
}
} while (again);
+
+ rxrpc_put_local(local);
}
/*
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 948e3fe249ec..935bb60fff56 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -87,7 +87,7 @@ static size_t rxrpc_fill_out_ack(struct rxrpc_connection *conn,
*_top = top;
pkt->ack.bufferSpace = htons(8);
- pkt->ack.maxSkew = htons(call->ackr_skew);
+ pkt->ack.maxSkew = htons(0);
pkt->ack.firstPacket = htonl(hard_ack + 1);
pkt->ack.previousPacket = htonl(call->ackr_prev_seq);
pkt->ack.serial = htonl(serial);
@@ -228,7 +228,6 @@ int rxrpc_send_ack_packet(struct rxrpc_call *call, bool ping,
if (ping)
clear_bit(RXRPC_CALL_PINGING, &call->flags);
rxrpc_propose_ACK(call, pkt->ack.reason,
- ntohs(pkt->ack.maxSkew),
ntohl(pkt->ack.serial),
false, true,
rxrpc_propose_ack_retry_tx);
@@ -566,7 +565,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
memset(&whdr, 0, sizeof(whdr));
while ((skb = skb_dequeue(&local->reject_queue))) {
- rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
sp = rxrpc_skb(skb);
switch (skb->mark) {
@@ -582,7 +581,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
ioc = 2;
break;
default:
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
continue;
}
@@ -607,7 +606,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
rxrpc_tx_point_reject);
}
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
}
_leave("");
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index 7666ec72d37e..c97ebdc043e4 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -163,11 +163,11 @@ void rxrpc_error_report(struct sock *sk)
_leave("UDP socket errqueue empty");
return;
}
- rxrpc_new_skb(skb, rxrpc_skb_rx_received);
+ rxrpc_new_skb(skb, rxrpc_skb_received);
serr = SKB_EXT_ERR(skb);
if (!skb->len && serr->ee.ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
_leave("UDP empty message");
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
return;
}
@@ -177,7 +177,7 @@ void rxrpc_error_report(struct sock *sk)
peer = NULL;
if (!peer) {
rcu_read_unlock();
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave(" [no peer]");
return;
}
@@ -189,7 +189,7 @@ void rxrpc_error_report(struct sock *sk)
serr->ee.ee_code == ICMP_FRAG_NEEDED)) {
rxrpc_adjust_mtu(peer, serr);
rcu_read_unlock();
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
rxrpc_put_peer(peer);
_leave(" [MTU update]");
return;
@@ -197,7 +197,7 @@ void rxrpc_error_report(struct sock *sk)
rxrpc_store_error(peer, serr);
rcu_read_unlock();
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
rxrpc_put_peer(peer);
_leave("");
diff --git a/net/rxrpc/protocol.h b/net/rxrpc/protocol.h
index 99ce322d7caa..49bb972539aa 100644
--- a/net/rxrpc/protocol.h
+++ b/net/rxrpc/protocol.h
@@ -89,6 +89,15 @@ struct rxrpc_jumbo_header {
#define RXRPC_JUMBO_DATALEN 1412 /* non-terminal jumbo packet data length */
#define RXRPC_JUMBO_SUBPKTLEN (RXRPC_JUMBO_DATALEN + sizeof(struct rxrpc_jumbo_header))
+/*
+ * The maximum number of subpackets that can possibly fit in a UDP packet is:
+ *
+ * ((max_IP - IP_hdr - UDP_hdr) / RXRPC_JUMBO_SUBPKTLEN) + 1
+ * = ((65535 - 28 - 28) / 1416) + 1
+ * = 46 non-terminal packets and 1 terminal packet.
+ */
+#define RXRPC_MAX_NR_JUMBO 47
+
/*****************************************************************************/
/*
* on-the-wire Rx ACK packet data payload
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 5abf46cf9e6c..3b0becb12041 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -141,7 +141,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
- rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true,
+ rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, serial, false, true,
rxrpc_propose_ack_terminal_ack);
//rxrpc_send_ack_packet(call, false, NULL);
}
@@ -159,7 +159,7 @@ static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
write_unlock_bh(&call->state_lock);
- rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
+ rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial, false, true,
rxrpc_propose_ack_processing_op);
break;
default:
@@ -177,7 +177,8 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
struct sk_buff *skb;
rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top;
- u8 flags;
+ bool last = false;
+ u8 subpacket;
int ix;
_enter("%d", call->debug_id);
@@ -189,30 +190,32 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
hard_ack++;
ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
skb = call->rxtx_buffer[ix];
- rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
+ rxrpc_see_skb(skb, rxrpc_skb_rotated);
sp = rxrpc_skb(skb);
- flags = sp->hdr.flags;
- serial = sp->hdr.serial;
- if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
- serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
+
+ subpacket = call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
+ serial = sp->hdr.serial + subpacket;
+
+ if (subpacket == sp->nr_subpackets - 1 &&
+ sp->rx_flags & RXRPC_SKB_INCL_LAST)
+ last = true;
call->rxtx_buffer[ix] = NULL;
call->rxtx_annotations[ix] = 0;
/* Barrier against rxrpc_input_data(). */
smp_store_release(&call->rx_hard_ack, hard_ack);
- rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
- _debug("%u,%u,%02x", hard_ack, top, flags);
trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
- if (flags & RXRPC_LAST_PACKET) {
+ if (last) {
rxrpc_end_rx_phase(call, serial);
} else {
/* Check to see if there's an ACK that needs sending. */
if (after_eq(hard_ack, call->ackr_consumed + 2) ||
after_eq(top, call->ackr_seen + 2) ||
(hard_ack == top && after(hard_ack, call->ackr_consumed)))
- rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
+ rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, serial,
true, true,
rxrpc_propose_ack_rotate_rx);
if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
@@ -233,18 +236,19 @@ static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
rxrpc_seq_t seq = sp->hdr.seq;
u16 cksum = sp->hdr.cksum;
+ u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
_enter("");
/* For all but the head jumbo subpacket, the security checksum is in a
* jumbo header immediately prior to the data.
*/
- if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
+ if (subpacket > 0) {
__be16 tmp;
if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
BUG();
cksum = ntohs(tmp);
- seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
+ seq += subpacket;
}
return call->conn->security->verify_packet(call, skb, offset, len,
@@ -265,19 +269,18 @@ static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
u8 *_annotation,
unsigned int *_offset, unsigned int *_len)
{
+ struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
unsigned int offset = sizeof(struct rxrpc_wire_header);
unsigned int len;
int ret;
u8 annotation = *_annotation;
+ u8 subpacket = annotation & RXRPC_RX_ANNO_SUBPACKET;
/* Locate the subpacket */
+ offset += subpacket * RXRPC_JUMBO_SUBPKTLEN;
len = skb->len - offset;
- if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
- offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
- RXRPC_JUMBO_SUBPKTLEN);
- len = (annotation & RXRPC_RX_ANNO_JLAST) ?
- skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
- }
+ if (subpacket < sp->nr_subpackets - 1)
+ len = RXRPC_JUMBO_DATALEN;
if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
@@ -303,6 +306,7 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
{
struct rxrpc_skb_priv *sp;
struct sk_buff *skb;
+ rxrpc_serial_t serial;
rxrpc_seq_t hard_ack, top, seq;
size_t remain;
bool last;
@@ -336,12 +340,15 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
break;
}
smp_rmb();
- rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
sp = rxrpc_skb(skb);
- if (!(flags & MSG_PEEK))
+ if (!(flags & MSG_PEEK)) {
+ serial = sp->hdr.serial;
+ serial += call->rxtx_annotations[ix] & RXRPC_RX_ANNO_SUBPACKET;
trace_rxrpc_receive(call, rxrpc_receive_front,
- sp->hdr.serial, seq);
+ serial, seq);
+ }
if (msg)
sock_recv_timestamp(msg, sock->sk, skb);
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index ae8cd8926456..c60c520fde7c 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -187,10 +187,8 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
struct rxrpc_skb_priv *sp;
struct rxrpc_crypt iv;
struct scatterlist sg[16];
- struct sk_buff *trailer;
unsigned int len;
u16 check;
- int nsg;
int err;
sp = rxrpc_skb(skb);
@@ -214,15 +212,14 @@ static int rxkad_secure_packet_encrypt(const struct rxrpc_call *call,
crypto_skcipher_encrypt(req);
/* we want to encrypt the skbuff in-place */
- nsg = skb_cow_data(skb, 0, &trailer);
- err = -ENOMEM;
- if (nsg < 0 || nsg > 16)
+ err = -EMSGSIZE;
+ if (skb_shinfo(skb)->nr_frags > 16)
goto out;
len = data_size + call->conn->size_align - 1;
len &= ~(call->conn->size_align - 1);
- sg_init_table(sg, nsg);
+ sg_init_table(sg, ARRAY_SIZE(sg));
err = skb_to_sgvec(skb, sg, 0, len);
if (unlikely(err < 0))
goto out;
@@ -319,11 +316,10 @@ static int rxkad_verify_packet_1(struct rxrpc_call *call, struct sk_buff *skb,
struct rxkad_level1_hdr sechdr;
struct rxrpc_crypt iv;
struct scatterlist sg[16];
- struct sk_buff *trailer;
bool aborted;
u32 data_size, buf;
u16 check;
- int nsg, ret;
+ int ret;
_enter("");
@@ -336,11 +332,7 @@ static int rxkad_verify_packet_1(struct rxrpc_call *call, struct sk_buff *skb,
/* Decrypt the skbuff in-place. TODO: We really want to decrypt
* directly into the target buffer.
*/
- nsg = skb_cow_data(skb, 0, &trailer);
- if (nsg < 0 || nsg > 16)
- goto nomem;
-
- sg_init_table(sg, nsg);
+ sg_init_table(sg, ARRAY_SIZE(sg));
ret = skb_to_sgvec(skb, sg, offset, 8);
if (unlikely(ret < 0))
return ret;
@@ -388,10 +380,6 @@ protocol_error:
if (aborted)
rxrpc_send_abort_packet(call);
return -EPROTO;
-
-nomem:
- _leave(" = -ENOMEM");
- return -ENOMEM;
}
/*
@@ -406,7 +394,6 @@ static int rxkad_verify_packet_2(struct rxrpc_call *call, struct sk_buff *skb,
struct rxkad_level2_hdr sechdr;
struct rxrpc_crypt iv;
struct scatterlist _sg[4], *sg;
- struct sk_buff *trailer;
bool aborted;
u32 data_size, buf;
u16 check;
@@ -423,12 +410,11 @@ static int rxkad_verify_packet_2(struct rxrpc_call *call, struct sk_buff *skb,
/* Decrypt the skbuff in-place. TODO: We really want to decrypt
* directly into the target buffer.
*/
- nsg = skb_cow_data(skb, 0, &trailer);
- if (nsg < 0)
- goto nomem;
-
sg = _sg;
- if (unlikely(nsg > 4)) {
+ nsg = skb_shinfo(skb)->nr_frags;
+ if (nsg <= 4) {
+ nsg = 4;
+ } else {
sg = kmalloc_array(nsg, sizeof(*sg), GFP_NOIO);
if (!sg)
goto nomem;
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index bae14438f869..6a1547b270fe 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -176,7 +176,7 @@ static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
skb->tstamp = ktime_get_real();
ix = seq & RXRPC_RXTX_BUFF_MASK;
- rxrpc_get_skb(skb, rxrpc_skb_tx_got);
+ rxrpc_get_skb(skb, rxrpc_skb_got);
call->rxtx_annotations[ix] = annotation;
smp_wmb();
call->rxtx_buffer[ix] = skb;
@@ -248,7 +248,7 @@ static int rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
}
out:
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave(" = %d", ret);
return ret;
}
@@ -289,7 +289,7 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
skb = call->tx_pending;
call->tx_pending = NULL;
- rxrpc_see_skb(skb, rxrpc_skb_tx_seen);
+ rxrpc_see_skb(skb, rxrpc_skb_seen);
copied = 0;
do {
@@ -336,7 +336,9 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
if (!skb)
goto maybe_error;
- rxrpc_new_skb(skb, rxrpc_skb_tx_new);
+ sp = rxrpc_skb(skb);
+ sp->rx_flags |= RXRPC_SKB_TX_BUFFER;
+ rxrpc_new_skb(skb, rxrpc_skb_new);
_debug("ALLOC SEND %p", skb);
@@ -346,7 +348,6 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
skb_reserve(skb, call->conn->security_size);
skb->len += call->conn->security_size;
- sp = rxrpc_skb(skb);
sp->remain = chunk;
if (sp->remain > skb_tailroom(skb))
sp->remain = skb_tailroom(skb);
@@ -439,7 +440,7 @@ out:
return ret;
call_terminated:
- rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
+ rxrpc_free_skb(skb, rxrpc_skb_freed);
_leave(" = %d", call->error);
return call->error;
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 9ad5045b7c2f..0348d2bf6f7d 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -14,7 +14,8 @@
#include <net/af_rxrpc.h>
#include "ar-internal.h"
-#define select_skb_count(op) (op >= rxrpc_skb_tx_cleaned ? &rxrpc_n_tx_skbs : &rxrpc_n_rx_skbs)
+#define is_tx_skb(skb) (rxrpc_skb(skb)->rx_flags & RXRPC_SKB_TX_BUFFER)
+#define select_skb_count(skb) (is_tx_skb(skb) ? &rxrpc_n_tx_skbs : &rxrpc_n_rx_skbs)
/*
* Note the allocation or reception of a socket buffer.
@@ -22,8 +23,9 @@
void rxrpc_new_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
{
const void *here = __builtin_return_address(0);
- int n = atomic_inc_return(select_skb_count(op));
- trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
+ int n = atomic_inc_return(select_skb_count(skb));
+ trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n,
+ rxrpc_skb(skb)->rx_flags, here);
}
/*
@@ -33,8 +35,9 @@ void rxrpc_see_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
{
const void *here = __builtin_return_address(0);
if (skb) {
- int n = atomic_read(select_skb_count(op));
- trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
+ int n = atomic_read(select_skb_count(skb));
+ trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n,
+ rxrpc_skb(skb)->rx_flags, here);
}
}
@@ -44,12 +47,23 @@ void rxrpc_see_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
void rxrpc_get_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
{
const void *here = __builtin_return_address(0);
- int n = atomic_inc_return(select_skb_count(op));
- trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
+ int n = atomic_inc_return(select_skb_count(skb));
+ trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n,
+ rxrpc_skb(skb)->rx_flags, here);
skb_get(skb);
}
/*
+ * Note the dropping of a ref on a socket buffer by the core.
+ */
+void rxrpc_eaten_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
+{
+ const void *here = __builtin_return_address(0);
+ int n = atomic_inc_return(&rxrpc_n_rx_skbs);
+ trace_rxrpc_skb(skb, op, 0, n, 0, here);
+}
+
+/*
* Note the destruction of a socket buffer.
*/
void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
@@ -58,8 +72,9 @@ void rxrpc_free_skb(struct sk_buff *skb, enum rxrpc_skb_trace op)
if (skb) {
int n;
CHECK_SLAB_OKAY(&skb->users);
- n = atomic_dec_return(select_skb_count(op));
- trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n, here);
+ n = atomic_dec_return(select_skb_count(skb));
+ trace_rxrpc_skb(skb, op, refcount_read(&skb->users), n,
+ rxrpc_skb(skb)->rx_flags, here);
kfree_skb(skb);
}
}
@@ -72,9 +87,10 @@ void rxrpc_purge_queue(struct sk_buff_head *list)
const void *here = __builtin_return_address(0);
struct sk_buff *skb;
while ((skb = skb_dequeue((list))) != NULL) {
- int n = atomic_dec_return(select_skb_count(rxrpc_skb_rx_purged));
- trace_rxrpc_skb(skb, rxrpc_skb_rx_purged,
- refcount_read(&skb->users), n, here);
+ int n = atomic_dec_return(select_skb_count(skb));
+ trace_rxrpc_skb(skb, rxrpc_skb_purged,
+ refcount_read(&skb->users), n,
+ rxrpc_skb(skb)->rx_flags, here);
kfree_skb(skb);
}
}