mptcp: recvmsg() can drain data from multiple subflows
With the previous patch in place, the msk can detect which subflow has the current map with a simple walk, let's update the main loop to always select the 'current' subflow. The exit conditions now closely mirror tcp_recvmsg() to get expected timeout and signal behavior. Co-developed-by: Peter Krystad <peter.krystad@linux.intel.com> Signed-off-by: Peter Krystad <peter.krystad@linux.intel.com> Co-developed-by: Davide Caratti <dcaratti@redhat.com> Signed-off-by: Davide Caratti <dcaratti@redhat.com> Co-developed-by: Matthieu Baerts <matthieu.baerts@tessares.net> Signed-off-by: Matthieu Baerts <matthieu.baerts@tessares.net> Co-developed-by: Mat Martineau <mathew.j.martineau@linux.intel.com> Signed-off-by: Mat Martineau <mathew.j.martineau@linux.intel.com> Co-developed-by: Florian Westphal <fw@strlen.de> Signed-off-by: Florian Westphal <fw@strlen.de> Signed-off-by: Paolo Abeni <pabeni@redhat.com> Signed-off-by: Christoph Paasch <cpaasch@apple.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
parent
1891c4a076
commit
7a6a6cbc3e
1 changed files with 168 additions and 10 deletions
|
@ -9,6 +9,8 @@
|
||||||
#include <linux/kernel.h>
|
#include <linux/kernel.h>
|
||||||
#include <linux/module.h>
|
#include <linux/module.h>
|
||||||
#include <linux/netdevice.h>
|
#include <linux/netdevice.h>
|
||||||
|
#include <linux/sched/signal.h>
|
||||||
|
#include <linux/atomic.h>
|
||||||
#include <net/sock.h>
|
#include <net/sock.h>
|
||||||
#include <net/inet_common.h>
|
#include <net/inet_common.h>
|
||||||
#include <net/inet_hashtables.h>
|
#include <net/inet_hashtables.h>
|
||||||
|
@ -105,6 +107,21 @@ static bool mptcp_ext_cache_refill(struct mptcp_sock *msk)
|
||||||
return !!msk->cached_ext;
|
return !!msk->cached_ext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct sock *mptcp_subflow_recv_lookup(const struct mptcp_sock *msk)
|
||||||
|
{
|
||||||
|
struct mptcp_subflow_context *subflow;
|
||||||
|
struct sock *sk = (struct sock *)msk;
|
||||||
|
|
||||||
|
sock_owned_by_me(sk);
|
||||||
|
|
||||||
|
mptcp_for_each_subflow(msk, subflow) {
|
||||||
|
if (subflow->data_avail)
|
||||||
|
return mptcp_subflow_tcp_sock(subflow);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
|
static int mptcp_sendmsg_frag(struct sock *sk, struct sock *ssk,
|
||||||
struct msghdr *msg, long *timeo)
|
struct msghdr *msg, long *timeo)
|
||||||
{
|
{
|
||||||
|
@ -269,13 +286,37 @@ int mptcp_read_actor(read_descriptor_t *desc, struct sk_buff *skb,
|
||||||
return copy_len;
|
return copy_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mptcp_wait_data(struct sock *sk, long *timeo)
|
||||||
|
{
|
||||||
|
DEFINE_WAIT_FUNC(wait, woken_wake_function);
|
||||||
|
struct mptcp_sock *msk = mptcp_sk(sk);
|
||||||
|
|
||||||
|
add_wait_queue(sk_sleep(sk), &wait);
|
||||||
|
sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
|
||||||
|
|
||||||
|
sk_wait_event(sk, timeo,
|
||||||
|
test_and_clear_bit(MPTCP_DATA_READY, &msk->flags), &wait);
|
||||||
|
|
||||||
|
sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
|
||||||
|
remove_wait_queue(sk_sleep(sk), &wait);
|
||||||
|
}
|
||||||
|
|
||||||
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
|
static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
|
||||||
int nonblock, int flags, int *addr_len)
|
int nonblock, int flags, int *addr_len)
|
||||||
{
|
{
|
||||||
struct mptcp_sock *msk = mptcp_sk(sk);
|
struct mptcp_sock *msk = mptcp_sk(sk);
|
||||||
|
struct mptcp_subflow_context *subflow;
|
||||||
|
bool more_data_avail = false;
|
||||||
|
struct mptcp_read_arg arg;
|
||||||
|
read_descriptor_t desc;
|
||||||
|
bool wait_data = false;
|
||||||
struct socket *ssock;
|
struct socket *ssock;
|
||||||
|
struct tcp_sock *tp;
|
||||||
|
bool done = false;
|
||||||
struct sock *ssk;
|
struct sock *ssk;
|
||||||
int copied = 0;
|
int copied = 0;
|
||||||
|
int target;
|
||||||
|
long timeo;
|
||||||
|
|
||||||
if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
|
if (msg->msg_flags & ~(MSG_WAITALL | MSG_DONTWAIT))
|
||||||
return -EOPNOTSUPP;
|
return -EOPNOTSUPP;
|
||||||
|
@ -290,16 +331,124 @@ static int mptcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
|
||||||
return copied;
|
return copied;
|
||||||
}
|
}
|
||||||
|
|
||||||
ssk = mptcp_subflow_get(msk);
|
arg.msg = msg;
|
||||||
if (!ssk) {
|
desc.arg.data = &arg;
|
||||||
release_sock(sk);
|
desc.error = 0;
|
||||||
return -ENOTCONN;
|
|
||||||
|
timeo = sock_rcvtimeo(sk, nonblock);
|
||||||
|
|
||||||
|
len = min_t(size_t, len, INT_MAX);
|
||||||
|
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
|
||||||
|
|
||||||
|
while (!done) {
|
||||||
|
u32 map_remaining;
|
||||||
|
int bytes_read;
|
||||||
|
|
||||||
|
ssk = mptcp_subflow_recv_lookup(msk);
|
||||||
|
pr_debug("msk=%p ssk=%p", msk, ssk);
|
||||||
|
if (!ssk)
|
||||||
|
goto wait_for_data;
|
||||||
|
|
||||||
|
subflow = mptcp_subflow_ctx(ssk);
|
||||||
|
tp = tcp_sk(ssk);
|
||||||
|
|
||||||
|
lock_sock(ssk);
|
||||||
|
do {
|
||||||
|
/* try to read as much data as available */
|
||||||
|
map_remaining = subflow->map_data_len -
|
||||||
|
mptcp_subflow_get_map_offset(subflow);
|
||||||
|
desc.count = min_t(size_t, len - copied, map_remaining);
|
||||||
|
pr_debug("reading %zu bytes, copied %d", desc.count,
|
||||||
|
copied);
|
||||||
|
bytes_read = tcp_read_sock(ssk, &desc,
|
||||||
|
mptcp_read_actor);
|
||||||
|
if (bytes_read < 0) {
|
||||||
|
if (!copied)
|
||||||
|
copied = bytes_read;
|
||||||
|
done = true;
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
|
||||||
|
pr_debug("msk ack_seq=%llx -> %llx", msk->ack_seq,
|
||||||
|
msk->ack_seq + bytes_read);
|
||||||
|
msk->ack_seq += bytes_read;
|
||||||
|
copied += bytes_read;
|
||||||
|
if (copied >= len) {
|
||||||
|
done = true;
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
if (tp->urg_data && tp->urg_seq == tp->copied_seq) {
|
||||||
|
pr_err("Urgent data present, cannot proceed");
|
||||||
|
done = true;
|
||||||
|
goto next;
|
||||||
|
}
|
||||||
|
next:
|
||||||
|
more_data_avail = mptcp_subflow_data_available(ssk);
|
||||||
|
} while (more_data_avail && !done);
|
||||||
|
release_sock(ssk);
|
||||||
|
continue;
|
||||||
|
|
||||||
|
wait_for_data:
|
||||||
|
more_data_avail = false;
|
||||||
|
|
||||||
|
/* only the master socket status is relevant here. The exit
|
||||||
|
* conditions mirror closely tcp_recvmsg()
|
||||||
|
*/
|
||||||
|
if (copied >= target)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (copied) {
|
||||||
|
if (sk->sk_err ||
|
||||||
|
sk->sk_state == TCP_CLOSE ||
|
||||||
|
(sk->sk_shutdown & RCV_SHUTDOWN) ||
|
||||||
|
!timeo ||
|
||||||
|
signal_pending(current))
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (sk->sk_err) {
|
||||||
|
copied = sock_error(sk);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sk->sk_shutdown & RCV_SHUTDOWN)
|
||||||
|
break;
|
||||||
|
|
||||||
|
if (sk->sk_state == TCP_CLOSE) {
|
||||||
|
copied = -ENOTCONN;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!timeo) {
|
||||||
|
copied = -EAGAIN;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (signal_pending(current)) {
|
||||||
|
copied = sock_intr_errno(timeo);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pr_debug("block timeout %ld", timeo);
|
||||||
|
wait_data = true;
|
||||||
|
mptcp_wait_data(sk, &timeo);
|
||||||
}
|
}
|
||||||
|
|
||||||
copied = sock_recvmsg(ssk->sk_socket, msg, flags);
|
if (more_data_avail) {
|
||||||
|
if (!test_bit(MPTCP_DATA_READY, &msk->flags))
|
||||||
|
set_bit(MPTCP_DATA_READY, &msk->flags);
|
||||||
|
} else if (!wait_data) {
|
||||||
|
clear_bit(MPTCP_DATA_READY, &msk->flags);
|
||||||
|
|
||||||
|
/* .. race-breaker: ssk might get new data after last
|
||||||
|
* data_available() returns false.
|
||||||
|
*/
|
||||||
|
ssk = mptcp_subflow_recv_lookup(msk);
|
||||||
|
if (unlikely(ssk))
|
||||||
|
set_bit(MPTCP_DATA_READY, &msk->flags);
|
||||||
|
}
|
||||||
|
|
||||||
release_sock(sk);
|
release_sock(sk);
|
||||||
|
|
||||||
return copied;
|
return copied;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,10 +609,6 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
|
||||||
msk->write_seq = subflow->idsn + 1;
|
msk->write_seq = subflow->idsn + 1;
|
||||||
ack_seq++;
|
ack_seq++;
|
||||||
msk->ack_seq = ack_seq;
|
msk->ack_seq = ack_seq;
|
||||||
subflow->map_seq = ack_seq;
|
|
||||||
subflow->map_subflow_seq = 1;
|
|
||||||
subflow->rel_write_seq = 1;
|
|
||||||
subflow->tcp_sock = ssk;
|
|
||||||
newsk = new_mptcp_sock;
|
newsk = new_mptcp_sock;
|
||||||
mptcp_copy_inaddrs(newsk, ssk);
|
mptcp_copy_inaddrs(newsk, ssk);
|
||||||
list_add(&subflow->node, &msk->conn_list);
|
list_add(&subflow->node, &msk->conn_list);
|
||||||
|
@ -475,6 +620,19 @@ static struct sock *mptcp_accept(struct sock *sk, int flags, int *err,
|
||||||
bh_unlock_sock(new_mptcp_sock);
|
bh_unlock_sock(new_mptcp_sock);
|
||||||
local_bh_enable();
|
local_bh_enable();
|
||||||
release_sock(sk);
|
release_sock(sk);
|
||||||
|
|
||||||
|
/* the subflow can already receive packet, avoid racing with
|
||||||
|
* the receive path and process the pending ones
|
||||||
|
*/
|
||||||
|
lock_sock(ssk);
|
||||||
|
subflow->map_seq = ack_seq;
|
||||||
|
subflow->map_subflow_seq = 1;
|
||||||
|
subflow->rel_write_seq = 1;
|
||||||
|
subflow->tcp_sock = ssk;
|
||||||
|
subflow->conn = new_mptcp_sock;
|
||||||
|
if (unlikely(!skb_queue_empty(&ssk->sk_receive_queue)))
|
||||||
|
mptcp_subflow_data_available(ssk);
|
||||||
|
release_sock(ssk);
|
||||||
}
|
}
|
||||||
|
|
||||||
return newsk;
|
return newsk;
|
||||||
|
|
Loading…
Add table
Reference in a new issue