summaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2016-06-17 06:35:57 -0400
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>2017-04-30 05:49:28 +0200
commit59e0cd110fb9fb9aa97bb59c57789adb0e82da8d (patch)
treebffd6b2d305e4167c69f37a9e1fd23eaf9f8dad1 /net
parentabc025d1e88a47c24a0f4411d851c1e9c3e0e87d (diff)
downloadlinux-stable-59e0cd110fb9fb9aa97bb59c57789adb0e82da8d.tar.gz
linux-stable-59e0cd110fb9fb9aa97bb59c57789adb0e82da8d.tar.bz2
linux-stable-59e0cd110fb9fb9aa97bb59c57789adb0e82da8d.zip
tipc: fix socket timer deadlock
commit f1d048f24e66ba85d3dabf3d076cefa5f2b546b0 upstream. We sometimes observe a 'deadly embrace' type deadlock occurring between mutually connected sockets on the same node. This happens when the one-hour peer supervision timers happen to expire simultaneously in both sockets. The scenario is as follows: CPU 1: CPU 2: -------- -------- tipc_sk_timeout(sk1) tipc_sk_timeout(sk2) lock(sk1.slock) lock(sk2.slock) msg_create(probe) msg_create(probe) unlock(sk1.slock) unlock(sk2.slock) tipc_node_xmit_skb() tipc_node_xmit_skb() tipc_node_xmit() tipc_node_xmit() tipc_sk_rcv(sk2) tipc_sk_rcv(sk1) lock(sk2.slock) lock((sk1.slock) filter_rcv() filter_rcv() tipc_sk_proto_rcv() tipc_sk_proto_rcv() msg_create(probe_rsp) msg_create(probe_rsp) tipc_sk_respond() tipc_sk_respond() tipc_node_xmit_skb() tipc_node_xmit_skb() tipc_node_xmit() tipc_node_xmit() tipc_sk_rcv(sk1) tipc_sk_rcv(sk2) lock((sk1.slock) lock((sk2.slock) ===> DEADLOCK ===> DEADLOCK Further analysis reveals that there are three different locations in the socket code where tipc_sk_respond() is called within the context of the socket lock, with ensuing risk of similar deadlocks. We now solve this by passing a buffer queue along with all upcalls where sk_lock.slock may potentially be held. Response or rejected message buffers are accumulated into this queue instead of being sent out directly, and only sent once we know we are safely outside the slock context. Reported-by: GUNA <gbalasun@gmail.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net> Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
Diffstat (limited to 'net')
-rw-r--r--net/tipc/socket.c54
1 files changed, 42 insertions, 12 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d119291db852..65171f8e8c45 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -777,9 +777,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
* @tsk: receiving socket
* @skb: pointer to message buffer.
*/
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct sock *sk = &tsk->sk;
+ u32 onode = tsk_own_node(tsk);
struct tipc_msg *hdr = buf_msg(skb);
int mtyp = msg_type(hdr);
int conn_cong;
@@ -792,7 +794,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
if (mtyp == CONN_PROBE) {
msg_set_type(hdr, CONN_PROBE_REPLY);
- tipc_sk_respond(sk, skb, TIPC_OK);
+ if (tipc_msg_reverse(onode, &skb, TIPC_OK))
+ __skb_queue_tail(xmitq, skb);
return;
} else if (mtyp == CONN_ACK) {
conn_cong = tsk_conn_cong(tsk);
@@ -1647,7 +1650,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
*
* Returns true if message was added to socket receive queue, otherwise false
*/
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
{
struct socket *sock = sk->sk_socket;
struct tipc_sock *tsk = tipc_sk(sk);
@@ -1657,7 +1661,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
int usr = msg_user(hdr);
if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
- tipc_sk_proto_rcv(tsk, skb);
+ tipc_sk_proto_rcv(tsk, skb, xmitq);
return false;
}
@@ -1700,7 +1704,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
return true;
reject:
- tipc_sk_respond(sk, skb, err);
+ if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
+ __skb_queue_tail(xmitq, skb);
return false;
}
@@ -1716,9 +1721,24 @@ reject:
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
{
unsigned int truesize = skb->truesize;
+ struct sk_buff_head xmitq;
+ u32 dnode, selector;
- if (likely(filter_rcv(sk, skb)))
+ __skb_queue_head_init(&xmitq);
+
+ if (likely(filter_rcv(sk, skb, &xmitq))) {
atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
+ return 0;
+ }
+
+ if (skb_queue_empty(&xmitq))
+ return 0;
+
+ /* Send response/rejected message */
+ skb = __skb_dequeue(&xmitq);
+ dnode = msg_destnode(buf_msg(skb));
+ selector = msg_origport(buf_msg(skb));
+ tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
return 0;
}
@@ -1732,12 +1752,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
* Caller must hold socket lock
*/
static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
- u32 dport)
+ u32 dport, struct sk_buff_head *xmitq)
{
+ unsigned long time_limit = jiffies + 2;
+ struct sk_buff *skb;
unsigned int lim;
atomic_t *dcnt;
- struct sk_buff *skb;
- unsigned long time_limit = jiffies + 2;
+ u32 onode;
while (skb_queue_len(inputq)) {
if (unlikely(time_after_eq(jiffies, time_limit)))
@@ -1749,7 +1770,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
/* Add message directly to receive queue if possible */
if (!sock_owned_by_user(sk)) {
- filter_rcv(sk, skb);
+ filter_rcv(sk, skb, xmitq);
continue;
}
@@ -1762,7 +1783,9 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
continue;
/* Overload => reject message back to sender */
- tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+ onode = tipc_own_addr(sock_net(sk));
+ if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+ __skb_queue_tail(xmitq, skb);
break;
}
}
@@ -1775,12 +1798,14 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
*/
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
{
+ struct sk_buff_head xmitq;
u32 dnode, dport = 0;
int err;
struct tipc_sock *tsk;
struct sock *sk;
struct sk_buff *skb;
+ __skb_queue_head_init(&xmitq);
while (skb_queue_len(inputq)) {
dport = tipc_skb_peek_port(inputq, dport);
tsk = tipc_sk_lookup(net, dport);
@@ -1788,9 +1813,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
if (likely(tsk)) {
sk = &tsk->sk;
if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
- tipc_sk_enqueue(inputq, sk, dport);
+ tipc_sk_enqueue(inputq, sk, dport, &xmitq);
spin_unlock_bh(&sk->sk_lock.slock);
}
+ /* Send pending response/rejected messages, if any */
+ while ((skb = __skb_dequeue(&xmitq))) {
+ dnode = msg_destnode(buf_msg(skb));
+ tipc_node_xmit_skb(net, skb, dnode, dport);
+ }
sock_put(sk);
continue;
}