summaryrefslogtreecommitdiffstats
path: root/net/mptcp
diff options
context:
space:
mode:
Diffstat (limited to 'net/mptcp')
-rw-r--r--net/mptcp/options.c16
-rw-r--r--net/mptcp/protocol.c402
-rw-r--r--net/mptcp/protocol.h11
-rw-r--r--net/mptcp/subflow.c32
4 files changed, 332 insertions, 129 deletions
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index fd2c3150e591..9c71f427e6e3 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -304,21 +304,22 @@ static bool mptcp_established_options_mp(struct sock *sk, struct sk_buff *skb,
static void mptcp_write_data_fin(struct mptcp_subflow_context *subflow,
struct mptcp_ext *ext)
{
- ext->data_fin = 1;
-
if (!ext->use_map) {
/* RFC6824 requires a DSS mapping with specific values
* if DATA_FIN is set but no data payload is mapped
*/
+ ext->data_fin = 1;
ext->use_map = 1;
ext->dsn64 = 1;
- ext->data_seq = mptcp_sk(subflow->conn)->write_seq;
+ ext->data_seq = subflow->data_fin_tx_seq;
ext->subflow_seq = 0;
ext->data_len = 1;
- } else {
- /* If there's an existing DSS mapping, DATA_FIN consumes
- * 1 additional byte of mapping space.
+ } else if (ext->data_seq + ext->data_len == subflow->data_fin_tx_seq) {
+ /* If there's an existing DSS mapping and it is the
+ * final mapping, DATA_FIN consumes 1 additional byte of
+ * mapping space.
*/
+ ext->data_fin = 1;
ext->data_len++;
}
}
@@ -356,8 +357,7 @@ static bool mptcp_established_options_dss(struct sock *sk, struct sk_buff *skb,
if (mpext)
opts->ext_copy = *mpext;
- if (skb && tcp_fin &&
- subflow->conn->sk_state != TCP_ESTABLISHED)
+ if (skb && tcp_fin && subflow->data_fin_tx_enable)
mptcp_write_data_fin(subflow, &opts->ext_copy);
ret = true;
}
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 3c19a8efdcea..c0cef07f4382 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -31,6 +31,12 @@ struct mptcp6_sock {
};
#endif
+struct mptcp_skb_cb {
+ u32 offset;
+};
+
+#define MPTCP_SKB_CB(__skb) ((struct mptcp_skb_cb *)&((__skb)->cb[0]))
+
/* If msk has an initial subflow socket, and the MP_CAPABLE handshake has not
* completed yet or has failed, return the subflow socket.
* Otherwise return NULL.
@@ -111,6 +117,143 @@ static struct sock *mptcp_subflow_get(const struct mptcp_sock *msk)
return NULL;
}
+static void __mptcp_move_skb(struct mptcp_sock *msk, struct sock *ssk,
+ struct sk_buff *skb,
+ unsigned int offset, size_t copy_len)
+{
+ struct sock *sk = (struct sock *)msk;
+
+ __skb_unlink(skb, &ssk->sk_receive_queue);
+ skb_set_owner_r(skb, sk);
+ __skb_queue_tail(&sk->sk_receive_queue, skb);
+
+ msk->ack_seq += copy_len;
+ MPTCP_SKB_CB(skb)->offset = offset;
+}
+
+static bool __mptcp_move_skbs_from_subflow(struct mptcp_sock *msk,
+ struct sock *ssk,
+ unsigned int *bytes)
+{
+ struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
+ struct sock *sk = (struct sock *)msk;
+ unsigned int moved = 0;
+ bool more_data_avail;
+ struct tcp_sock *tp;
+ bool done = false;
+
+ if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK)) {
+ int rcvbuf = max(ssk->sk_rcvbuf, sk->sk_rcvbuf);
+
+ if (rcvbuf > sk->sk_rcvbuf)
+ sk->sk_rcvbuf = rcvbuf;
+ }
+
+ tp = tcp_sk(ssk);
+ do {
+ u32 map_remaining, offset;
+ u32 seq = tp->copied_seq;
+ struct sk_buff *skb;
+ bool fin;
+
+ /* try to move as much data as available */
+ map_remaining = subflow->map_data_len -
+ mptcp_subflow_get_map_offset(subflow);
+
+ skb = skb_peek(&ssk->sk_receive_queue);
+ if (!skb)
+ break;
+
+ offset = seq - TCP_SKB_CB(skb)->seq;
+ fin = TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN;
+ if (fin) {
+ done = true;
+ seq++;
+ }
+
+ if (offset < skb->len) {
+ size_t len = skb->len - offset;
+
+ if (tp->urg_data)
+ done = true;
+
+ __mptcp_move_skb(msk, ssk, skb, offset, len);
+ seq += len;
+ moved += len;
+
+ if (WARN_ON_ONCE(map_remaining < len))
+ break;
+ } else {
+ WARN_ON_ONCE(!fin);
+ sk_eat_skb(ssk, skb);
+ done = true;
+ }
+
+ WRITE_ONCE(tp->copied_seq, seq);
+ more_data_avail = mptcp_subflow_data_available(ssk);
+
+ if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf)) {
+ done = true;
+ break;
+ }
+ } while (more_data_avail);
+
+ *bytes = moved;
+
+ return done;
+}
+
+/* In most cases we will be able to lock the mptcp socket. If its already
+ * owned, we need to defer to the work queue to avoid ABBA deadlock.
+ */
+static bool move_skbs_to_msk(struct mptcp_sock *msk, struct sock *ssk)
+{
+ struct sock *sk = (struct sock *)msk;
+ unsigned int moved = 0;
+
+ if (READ_ONCE(sk->sk_lock.owned))
+ return false;
+
+ if (unlikely(!spin_trylock_bh(&sk->sk_lock.slock)))
+ return false;
+
+ /* must re-check after taking the lock */
+ if (!READ_ONCE(sk->sk_lock.owned))
+ __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+
+ spin_unlock_bh(&sk->sk_lock.slock);
+
+ return moved > 0;
+}
+
+void mptcp_data_ready(struct sock *sk, struct sock *ssk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ set_bit(MPTCP_DATA_READY, &msk->flags);
+
+ if (atomic_read(&sk->sk_rmem_alloc) < READ_ONCE(sk->sk_rcvbuf) &&
+ move_skbs_to_msk(msk, ssk))
+ goto wake;
+
+ /* don't schedule if mptcp sk is (still) over limit */
+ if (atomic_read(&sk->sk_rmem_alloc) > READ_ONCE(sk->sk_rcvbuf))
+ goto wake;
+
+ /* mptcp socket is owned, release_cb should retry */
+ if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED,
+ &sk->sk_tsq_flags)) {
+ sock_hold(sk);
+
+ /* need to try again, its possible release_cb() has already
+ * been called after the test_and_set_bit() above.
+ */
+ move_skbs_to_msk(msk, ssk);
+ }
+wake:
+ sk->sk_data_ready(sk);
+}
+
static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
{
if (!msk->cached_ext)
@@ -278,6 +421,15 @@ static int mptcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t len)
return -EOPNOTSUPP;
lock_sock(sk);
+
+ timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
+
+ if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) {
+ ret = sk_stream_wait_connect(sk, &timeo);
+ if (ret)
+ goto out;
+ }
+
ssock = __mptcp_tcp_fallback(msk);
if (unlikely(ssock)) {
fallback:
@@ -286,8 +438,6 @@ fallback:
return ret >= 0 ? ret + copied : (copied ? copied : ret);
}
- timeo = sock_sndtimeo(sk, msg->msg_flags & MSG_DONTWAIT);
-
ssk = mptcp_subflow_get(msk);
if (!ssk) {
release_sock(sk);
@@ -319,65 +469,88 @@ fallback:
ssk_check_wmem(msk, ssk);
release_sock(ssk);
+out:
release_sock(sk);
return ret;
}
-int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
- unsigned int offset, size_t len)
+static void mptcp_wait_data(struct sock *sk, long *timeo)
{
- struct mptcp_read_arg *arg = desc->arg.data;
- size_t copy_len;
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- copy_len = min(desc->count, len);
+ sk_wait_event(sk, timeo,
+ test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
- if (likely(arg->msg)) {
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+}
+
+static int __mptcp_recvmsg_mskq(struct mptcp_sock *msk,
+ struct msghdr *msg,
+ size_t len)
+{
+ struct sock *sk = (struct sock *)msk;
+ struct sk_buff *skb;
+ int copied = 0;
+
+ while ((skb = skb_peek(&sk->sk_receive_queue)) != NULL) {
+ u32 offset = MPTCP_SKB_CB(skb)->offset;
+ u32 data_len = skb->len - offset;
+ u32 count = min_t(size_t, len - copied, data_len);
int err;
- err = skb_copy_datagram_msg(skb, offset, arg->msg, copy_len);
- if (err) {
- pr_debug("error path");
- desc->error = err;
- return err;
+ err = skb_copy_datagram_msg(skb, offset, msg, count);
+ if (unlikely(err < 0)) {
+ if (!copied)
+ return err;
+ break;
}
- } else {
- pr_debug("Flushing skb payload");
- }
- desc->count -= copy_len;
+ copied += count;
+
+ if (count < data_len) {
+ MPTCP_SKB_CB(skb)->offset += count;
+ break;
+ }
+
+ __skb_unlink(skb, &sk->sk_receive_queue);
+ __kfree_skb(skb);
+
+ if (copied >= len)
+ break;
+ }
- pr_debug("consumed %zu bytes, %zu left", copy_len, desc->count);
- return copy_len;
+ return copied;
}
-static void mptcp_wait_data(struct sock *sk, long *timeo)
+static bool __mptcp_move_skbs(struct mptcp_sock *msk)
{
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
- struct mptcp_sock *msk = mptcp_sk(sk);
+ unsigned int moved = 0;
+ bool done;
- add_wait_queue(sk_sleep(sk), &wait);
- sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ do {
+ struct sock *ssk = mptcp_subflow_recv_lookup(msk);
- sk_wait_event(sk, timeo,
- test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
+ if (!ssk)
+ break;
- sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- remove_wait_queue(sk_sleep(sk), &wait);
+ lock_sock(ssk);
+ done = __mptcp_move_skbs_from_subflow(msk, ssk, &moved);
+ release_sock(ssk);
+ } while (!done);
+
+ return moved > 0;
}
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
struct mptcp_sock *msk = mptcp_sk(sk);
- struct mptcp_subflow_context *subflow;
- bool more_data_avail = false;
- struct mptcp_read_arg arg;
- read_descriptor_t desc;
- bool wait_data = false;
struct socket *ssock;
- struct tcp_sock *tp;
- bool done = false;
- struct sock *ssk;
int copied = 0;
int target;
long timeo;
@@ -395,65 +568,26 @@ fallback:
return copied;
}
- arg.msg = msg;
- desc.arg.data = &arg;
- desc.error = 0;
-
timeo = sock_rcvtimeo(sk, nonblock);
len = min_t(size_t, len, INT_MAX);
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
- while (!done) {
- u32 map_remaining;
+ while (len > (size_t)copied) {
int bytes_read;
- ssk = mptcp_subflow_recv_lookup(msk);
- pr_debug("msk=%p ssk=%p", msk, ssk);
- if (!ssk)
- goto wait_for_data;
-
- subflow = mptcp_subflow_ctx(ssk);
- tp = tcp_sk(ssk);
-
- lock_sock(ssk);
- do {
- /* try to read as much data as available */
- map_remaining = subflow->map_data_len -
- mptcp_subflow_get_map_offset(subflow);
- desc.count = min_t(size_t, len - copied, map_remaining);
- pr_debug("reading %zu bytes, copied %d", desc.count,
- copied);
- bytes_read = tcp_read_sock(ssk, &desc,
- mptcp_read_actor);
- if (bytes_read < 0) {
- if (!copied)
- copied = bytes_read;
- done = true;
- goto next;
- }
+ bytes_read = __mptcp_recvmsg_mskq(msk, msg, len - copied);
+ if (unlikely(bytes_read < 0)) {
+ if (!copied)
+ copied = bytes_read;
+ goto out_err;
+ }
- pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
- msk->ack_seq + bytes_read);
- msk->ack_seq += bytes_read;
- copied += bytes_read;
- if (copied >= len) {
- done = true;
- goto next;
- }
- if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
- pr_err("Urgent data present, cannot proceed");
- done = true;
- goto next;
- }
-next:
- more_data_avail = mptcp_subflow_data_available(ssk);
- } while (more_data_avail && !done);
- release_sock(ssk);
- continue;
+ copied += bytes_read;
-wait_for_data:
- more_data_avail = false;
+ if (skb_queue_empty(&sk->sk_receive_queue) &&
+ __mptcp_move_skbs(msk))
+ continue;
/* only the master socket status is relevant here. The exit
* conditions mirror closely tcp_recvmsg()
@@ -494,26 +628,25 @@ wait_for_data:
}
pr_debug("block timeout %ld", timeo);
- wait_data = true;
mptcp_wait_data(sk, &timeo);
if (unlikely(__mptcp_tcp_fallback(msk)))
goto fallback;
}
- if (more_data_avail) {
- if (!test_bit(MPTCP_DATA_READY, &msk->flags))
- set_bit(MPTCP_DATA_READY, &msk->flags);
- } else if (!wait_data) {
+ if (skb_queue_empty(&sk->sk_receive_queue)) {
+ /* entire backlog drained, clear DATA_READY. */
clear_bit(MPTCP_DATA_READY, &msk->flags);
- /* .. race-breaker: ssk might get new data after last
- * data_available() returns false.
+ /* .. race-breaker: ssk might have gotten new data
+ * after last __mptcp_move_skbs() returned false.
*/
- ssk = mptcp_subflow_recv_lookup(msk);
- if (unlikely(ssk))
+ if (unlikely(__mptcp_move_skbs(msk)))
set_bit(MPTCP_DATA_READY, &msk->flags);
+ } else if (unlikely(!test_bit(MPTCP_DATA_READY, &msk->flags))) {
+ /* data to read but mptcp_wait_data() cleared DATA_READY */
+ set_bit(MPTCP_DATA_READY, &msk->flags);
}
-
+out_err:
release_sock(sk);
return copied;
}
@@ -548,12 +681,24 @@ static unsigned int mptcp_sync_mss(struct sock *sk, u32 pmtu)
return 0;
}
+static void mptcp_worker(struct work_struct *work)
+{
+ struct mptcp_sock *msk = container_of(work, struct mptcp_sock, work);
+ struct sock *sk = &msk->sk.icsk_inet.sk;
+
+ lock_sock(sk);
+ __mptcp_move_skbs(msk);
+ release_sock(sk);
+ sock_put(sk);
+}
+
static int __mptcp_init_sock(struct sock *sk)
{
struct mptcp_sock *msk = mptcp_sk(sk);
INIT_LIST_HEAD(&msk->conn_list);
__set_bit(MPTCP_SEND_SPACE, &msk->flags);
+ INIT_WORK(&msk->work, mptcp_worker);
msk->first = NULL;
inet_csk(sk)->icsk_sync_mss = mptcp_sync_mss;
@@ -569,7 +714,16 @@ static int mptcp_init_sock(struct sock *sk)
return __mptcp_init_sock(sk);
}
-static void mptcp_subflow_shutdown(struct sock *ssk, int how)
+static void mptcp_cancel_work(struct sock *sk)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+
+ if (cancel_work_sync(&msk->work))
+ sock_put(sk);
+}
+
+static void mptcp_subflow_shutdown(struct sock *ssk, int how,
+ bool data_fin_tx_enable, u64 data_fin_tx_seq)
{
lock_sock(ssk);
@@ -582,6 +736,14 @@ static void mptcp_subflow_shutdown(struct sock *ssk, int how)
tcp_disconnect(ssk, O_NONBLOCK);
break;
default:
+ if (data_fin_tx_enable) {
+ struct mptcp_subflow_context *subflow;
+
+ subflow = mptcp_subflow_ctx(ssk);
+ subflow->data_fin_tx_seq = data_fin_tx_seq;
+ subflow->data_fin_tx_enable = 1;
+ }
+
ssk->sk_shutdown |= how;
tcp_shutdown(ssk, how);
break;
@@ -598,6 +760,7 @@ static void mptcp_close(struct sock *sk, long timeout)
struct mptcp_subflow_context *subflow, *tmp;
struct mptcp_sock *msk = mptcp_sk(sk);
LIST_HEAD(conn_list);
+ u64 data_fin_tx_seq;
lock_sock(sk);
@@ -606,14 +769,22 @@ static void mptcp_close(struct sock *sk, long timeout)
list_splice_init(&msk->conn_list, &conn_list);
+ data_fin_tx_seq = msk->write_seq;
+
release_sock(sk);
list_for_each_entry_safe(subflow, tmp, &conn_list, node) {
struct sock *ssk = mptcp_subflow_tcp_sock(subflow);
+ subflow->data_fin_tx_seq = data_fin_tx_seq;
+ subflow->data_fin_tx_enable = 1;
__mptcp_close_ssk(sk, ssk, subflow, timeout);
}
+ mptcp_cancel_work(sk);
+
+ __skb_queue_purge(&sk->sk_receive_queue);
+
sk_common_release(sk);
}
@@ -699,7 +870,7 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
*err = -ENOBUFS;
local_bh_enable();
release_sock(sk);
- mptcp_subflow_shutdown(newsk, SHUT_RDWR + 1);
+ mptcp_subflow_shutdown(newsk, SHUT_RDWR + 1, 0, 0);
tcp_close(newsk, 0);
return NULL;
}
@@ -807,6 +978,32 @@ static int mptcp_getsockopt(struct sock *sk, int level, int optname,
return -EOPNOTSUPP;
}
+#define MPTCP_DEFERRED_ALL TCPF_DELACK_TIMER_DEFERRED
+
+/* this is very alike tcp_release_cb() but we must handle differently a
+ * different set of events
+ */
+static void mptcp_release_cb(struct sock *sk)
+{
+ unsigned long flags, nflags;
+
+ do {
+ flags = sk->sk_tsq_flags;
+ if (!(flags & MPTCP_DEFERRED_ALL))
+ return;
+ nflags = flags & ~MPTCP_DEFERRED_ALL;
+ } while (cmpxchg(&sk->sk_tsq_flags, flags, nflags) != flags);
+
+ if (flags & TCPF_DELACK_TIMER_DEFERRED) {
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct sock *ssk;
+
+ ssk = mptcp_subflow_recv_lookup(msk);
+ if (!ssk || !schedule_work(&msk->work))
+ __sock_put(sk);
+ }
+}
+
static int mptcp_get_port(struct sock *sk, unsigned short snum)
{
struct mptcp_sock *msk = mptcp_sk(sk);
@@ -852,6 +1049,10 @@ void mptcp_finish_connect(struct sock *ssk)
WRITE_ONCE(msk->write_seq, subflow->idsn + 1);
WRITE_ONCE(msk->ack_seq, ack_seq);
WRITE_ONCE(msk->can_ack, 1);
+ if (inet_sk_state_load(sk) != TCP_ESTABLISHED) {
+ inet_sk_state_store(sk, TCP_ESTABLISHED);
+ sk->sk_state_change(sk);
+ }
}
static void mptcp_sock_graft(struct sock *sk, struct socket *parent)
@@ -882,6 +1083,7 @@ static struct proto mptcp_prot = {
.destroy = mptcp_destroy,
.sendmsg = mptcp_sendmsg,
.recvmsg = mptcp_recvmsg,
+ .release_cb = mptcp_release_cb,
.hash = inet_hash,
.unhash = inet_unhash,
.get_port = mptcp_get_port,
@@ -1127,7 +1329,7 @@ static int mptcp_shutdown(struct socket *sock, int how)
mptcp_for_each_subflow(msk, subflow) {
struct sock *tcp_sk = mptcp_subflow_tcp_sock(subflow);
- mptcp_subflow_shutdown(tcp_sk, how);
+ mptcp_subflow_shutdown(tcp_sk, how, 1, msk->write_seq);
}
out_unlock:
@@ -1180,6 +1382,8 @@ void mptcp_proto_init(void)
panic("Failed to register MPTCP proto.\n");
inet_register_protosw(&mptcp_protosw);
+
+ BUILD_BUG_ON(sizeof(struct mptcp_skb_cb) > sizeof_field(struct sk_buff, cb));
}
#if IS_ENABLED(CONFIG_MPTCP_IPV6)
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index 9f8663b30456..313558fa8185 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -70,6 +70,7 @@ struct mptcp_sock {
u32 token;
unsigned long flags;
bool can_ack;
+ struct work_struct work;
struct list_head conn_list;
struct skb_ext *cached_ext; /* for the next sendmsg */
struct socket *subflow; /* outgoing connect/listener/!mp_capable */
@@ -124,7 +125,9 @@ struct mptcp_subflow_context {
mpc_map : 1,
data_avail : 1,
rx_eof : 1,
+ data_fin_tx_enable : 1,
can_ack : 1; /* only after processing the remote a key */
+ u64 data_fin_tx_seq;
struct sock *tcp_sock; /* tcp sk backpointer */
struct sock *conn; /* parent mptcp_sock */
@@ -190,17 +193,11 @@ void mptcp_proto_init(void);
int mptcp_proto_v6_init(void);
#endif
-struct mptcp_read_arg {
- struct msghdr *msg;
-};
-
-int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
- unsigned int offset, size_t len);
-
void mptcp_get_options(const struct sk_buff *skb,
struct tcp_options_received *opt_rx);
void mptcp_finish_connect(struct sock *sk);
+void mptcp_data_ready(struct sock *sk, struct sock *ssk);
int mptcp_token_new_request(struct request_sock *req);
void mptcp_token_destroy_request(u32 token);
diff --git a/net/mptcp/subflow.c b/net/mptcp/subflow.c
index 65122edf60aa..0de2a44bdaa0 100644
--- a/net/mptcp/subflow.c
+++ b/net/mptcp/subflow.c
@@ -408,6 +408,18 @@ validate_seq:
return MAPPING_OK;
}
+static int subflow_read_actor(read_descriptor_t *desc,
+ struct sk_buff *skb,
+ unsigned int offset, size_t len)
+{
+ size_t copy_len = min(desc->count, len);
+
+ desc->count -= copy_len;
+
+ pr_debug("flushed %zu bytes, %zu left", copy_len, desc->count);
+ return copy_len;
+}
+
static bool subflow_check_data_avail(struct sock *ssk)
{
struct mptcp_subflow_context *subflow = mptcp_subflow_ctx(ssk);
@@ -482,16 +494,12 @@ static bool subflow_check_data_avail(struct sock *ssk)
pr_debug("discarding %zu bytes, current map len=%d", delta,
map_remaining);
if (delta) {
- struct mptcp_read_arg arg = {
- .msg = NULL,
- };
read_descriptor_t desc = {
.count = delta,
- .arg.data = &arg,
};
int ret;
- ret = tcp_read_sock(ssk, &desc, mptcp_read_actor);
+ ret = tcp_read_sock(ssk, &desc, subflow_read_actor);
if (ret < 0) {
ssk->sk_err = -ret;
goto fatal;
@@ -554,11 +562,8 @@ static void subflow_data_ready(struct sock *sk)
return;
}
- if (mptcp_subflow_data_available(sk)) {
- set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
-
- parent->sk_data_ready(parent);
- }
+ if (mptcp_subflow_data_available(sk))
+ mptcp_data_ready(parent, sk);
}
static void subflow_write_space(struct sock *sk)
@@ -690,11 +695,8 @@ static void subflow_state_change(struct sock *sk)
* a fin packet carrying a DSS can be unnoticed if we don't trigger
* the data available machinery here.
*/
- if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk)) {
- set_bit(MPTCP_DATA_READY, &mptcp_sk(parent)->flags);
-
- parent->sk_data_ready(parent);
- }
+ if (parent && subflow->mp_capable && mptcp_subflow_data_available(sk))
+ mptcp_data_ready(parent, sk);
if (parent && !(parent->sk_shutdown & RCV_SHUTDOWN) &&
!subflow->rx_eof && subflow_is_done(sk)) {