linux TCP Prequeue队列和backlog队列

概念

在应用程序调用系统调用持有sock锁的时候,协议栈把数据包加入backlog,可以在应用程序释放锁的时候处理数据包。
可见backlog中的包很快便会在应用程序上下文中处理。

而应用程序没有锁的时候,则只能尝试把数据把放到另一个prequeue队列中。
tcp_recvmsg()中并不会一直占有锁,在处理backlog的时候会主动释放锁,在阻塞等待的时候也会释放锁,这就给了数据包加入prequeue的时机。

在delack定时器中,因为要尽可能多的ack数据,也会处理prequeue队列。因为delack也需要获得锁,因此在release_sock中处理backlog队列,如果有delack被推迟执行,则很可能也会处理prequeue队列。

因此协议栈先尝试把包加入backlog中, 在尝试加入prequeue中,如果都不能则在软中断上下文中处理数据包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int tcp_v4_rcv(struct sk_buff *skb)
{
...
bh_lock_sock_nested(sk); //slock
tcp_segs_in(tcp_sk(sk), skb);
ret = 0;
if (!sock_owned_by_user(sk)) { //用户进程没有获得锁。 sock_owned_by_user被slock保护
if (!tcp_prequeue(sk, skb)) // 加入prequeue队列中,返回处理更多数据包,同时让数据包在用户上下文中处理数据包
ret = tcp_v4_do_rcv(sk, skb); //没有加入prequeue,直接处理
} else if (tcp_add_backlog(sk, skb)) { //用户进程获得锁,则直接挂到backlog中,等用户进程释放锁的时候处理
goto discard_and_relse;
}
bh_unlock_sock(sk);
...
}
  • 为什么不在协议栈中直接处理?
  1. cache的角度
    prequeue和backlog都是为了在应用程序上下文去处理数据包。
    因为softirq上下文和应用程序上下文之间切换,会造成cache刷新。
    比如ksoftirqd和应用程序不在一个cpu上; 或是协议栈处理完后通知应用程序,应用程序被唤醒,同样会造成造成cache刷新

  2. ack的角度
    如果直接在协议栈快速处理包,则很可能导致快速ack,使对方快速达到很大的发送速率,但是本地用户进程可能并不活跃,就会导致接收缓存满了,对方瞬间停止发送。 造成很明显的抖动。
    因此在应用程序处理数据包和ack的发送,可以反映用户进程的实时情况。
    但同时对突发的小包和需要低延迟的消息,放入prequeue中,也会造成非常不好的影响

  3. 锁的角度
    softirq中不能睡眠,也不应该跟应用程序抢锁, 如果tcp_recvmsg读取一大块内存,tcp_v4_rcv就需要很久才能抢到锁。
    因此使用prequeue就可以避免这样的情况。

backlog队列

sock_owned_by_user(sk)会被bh_lock_sock_nested(sk)保护,因此如果sock_owned_by_user(sk)并把skb加入backlog之前,应用程序不会release_sock。

tcp_add_backlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
bool tcp_add_backlog(struct sock *sk, struct sk_buff *skb)
{
u32 limit = sk->sk_rcvbuf + sk->sk_sndbuf;
/* Only socket owner can try to collapse/prune rx queues
* to reduce memory overhead, so add a little headroom here.
* Few sockets backlog are possibly concurrently non empty.
*/
limit += 64*1024;
/* In case all data was pulled from skb frags (in __pskb_pull_tail()),
* we can fix skb->truesize to its real value to avoid future drops.
* This is valid because skb is not yet charged to the socket.
* It has been noticed pure SACK packets were sometimes dropped
* (if cooked by drivers without copybreak feature).
*/
if (!skb->data_len)
skb->truesize = SKB_TRUESIZE(skb_end_offset(skb));
if (unlikely(sk_add_backlog(sk, skb, limit))) {
bh_unlock_sock(sk);
__NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPBACKLOGDROP);
return true;
}
return false;
}
static inline bool sk_rcvqueues_full(const struct sock *sk, unsigned int limit)
{
unsigned int qsize = sk->sk_backlog.len + atomic_read(&sk->sk_rmem_alloc);
return qsize > limit;
}
/* The per-socket spinlock must be held here. */
static inline __must_check int sk_add_backlog(struct sock *sk, struct sk_buff *skb,
unsigned int limit)
{
if (sk_rcvqueues_full(sk, limit)) //backlog和sk_receive_queue中缓存使用是否超过限制
return -ENOBUFS;
/*
* If the skb was allocated from pfmemalloc reserves, only
* allow SOCK_MEMALLOC sockets to use it as this socket is
* helping free memory
*/
if (skb_pfmemalloc(skb) && !sock_flag(sk, SOCK_MEMALLOC))
return -ENOMEM;
__sk_add_backlog(sk, skb); //添加到backlog
sk->sk_backlog.len += skb->truesize;
return 0;
}
/* OOB backlog add */
static inline void __sk_add_backlog(struct sock *sk, struct sk_buff *skb)
{
/* dont let skb dst not refcounted, we are going to leave rcu lock */
skb_dst_force_safe(skb);
if (!sk->sk_backlog.tail)
sk->sk_backlog.head = skb;
else
sk->sk_backlog.tail->next = skb;
sk->sk_backlog.tail = skb;
skb->next = NULL;
}

release_sock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void release_sock(struct sock *sk)
{
spin_lock_bh(&sk->sk_lock.slock);
if (sk->sk_backlog.tail)
__release_sock(sk); //sk_backlog_rcv处理backlog队列
/* Warning : release_cb() might need to release sk ownership,
* ie call sock_release_ownership(sk) before us.
*/
if (sk->sk_prot->release_cb) // tcp_release_cb
sk->sk_prot->release_cb(sk); //如果delack推迟执行,也会处理prequeue队列
sock_release_ownership(sk); //!sock_owned_by_user(sk)
if (waitqueue_active(&sk->sk_lock.wq)) //如果其他进程在等待这个ownership
wake_up(&sk->sk_lock.wq);
spin_unlock_bh(&sk->sk_lock.slock);
}
static void __release_sock(struct sock *sk)
__releases(&sk->sk_lock.slock)
__acquires(&sk->sk_lock.slock)
{
struct sk_buff *skb, *next;
while ((skb = sk->sk_backlog.head) != NULL) {
sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
spin_unlock_bh(&sk->sk_lock.slock);
do {
next = skb->next;
prefetch(next);
WARN_ON_ONCE(skb_dst_is_noref(skb));
skb->next = NULL;
sk_backlog_rcv(sk, skb); //处理backlog,调用tcp_v4_do_rcv
cond_resched();
skb = next;
} while (skb != NULL);
spin_lock_bh(&sk->sk_lock.slock);
}
/*
* Doing the zeroing here guarantee we can not loop forever
* while a wild producer attempts to flood us.
*/
sk->sk_backlog.len = 0;
}

prequeue队列

如果不能加入backlog队列,则尝试加入prequeue队列。
如果sysctl_tcp_low_latency=1表示系统关闭prequeue队列, 并且如果有应用程序正在recvmsg,则才会把数据包放入prequeue中。
如果prequeue队列太长,则直接在softirq上下文处理该队列。 如果是prequeue第一个包则唤醒用户进程, 并设置dack定时器,
在dack定时器中也会处理prequeue队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
bool tcp_prequeue(struct sock *sk, struct sk_buff *skb)
{
struct tcp_sock *tp = tcp_sk(sk);
if (sysctl_tcp_low_latency || !tp->ucopy.task) //没有启用prequeue,没有应用程序在处理
return false;
if (skb->len <= tcp_hdrlen(skb) &&
skb_queue_len(&tp->ucopy.prequeue) == 0) //synack
return false;
/* Before escaping RCU protected region, we need to take care of skb
* dst. Prequeue is only enabled for established sockets.
* For such sockets, we might need the skb dst only to set sk->sk_rx_dst
* Instead of doing full sk_rx_dst validity here, let's perform
* an optimistic check.
*/
if (likely(sk->sk_rx_dst))
skb_dst_drop(skb);
else
skb_dst_force_safe(skb);
__skb_queue_tail(&tp->ucopy.prequeue, skb); //添加到prequeue
tp->ucopy.memory += skb->truesize;
if (skb_queue_len(&tp->ucopy.prequeue) >= 32 ||
tp->ucopy.memory + atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf) { //数据包过多或者缓存不够,直接在softirq上下文中处理
struct sk_buff *skb1;
BUG_ON(sock_owned_by_user(sk));
__NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPPREQUEUEDROPPED,
skb_queue_len(&tp->ucopy.prequeue));
while ((skb1 = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
sk_backlog_rcv(sk, skb1);
tp->ucopy.memory = 0;
} else if (skb_queue_len(&tp->ucopy.prequeue) == 1) { //第一个包,唤醒用户进程
wake_up_interruptible_sync_poll(sk_sleep(sk), //sk_wait_data中有个时间窗口,可能不会唤醒用户进程。 如果没有唤醒则会导致应用程序等待超时,还有cache的刷新
POLLIN | POLLRDNORM | POLLRDBAND);
if (!inet_csk_ack_scheduled(sk)) //之前没有ack被推迟,重置dack定时器, 因为不知道应用程序会什么时候处理prequeue
inet_csk_reset_xmit_timer(sk, ICSK_TIME_DACK, //如果应用程序一直不处理,则在tcp_delack_timer_handler中处理prequeue队列
(3 * tcp_rto_min(sk)) / 4,
TCP_RTO_MAX);
}
return true;
}

dack定时器处理prequeue

在应用程序没有加锁的时候,调用tcp_delack_timer_handler,处理prequeue,并判断是否需要发送ack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/* Called with BH disabled */
void tcp_delack_timer_handler(struct sock *sk)
{
struct tcp_sock *tp = tcp_sk(sk);
struct inet_connection_sock *icsk = inet_csk(sk);
sk_mem_reclaim_partial(sk);
if (sk->sk_state == TCP_CLOSE || !(icsk->icsk_ack.pending & ICSK_ACK_TIMER))
goto out;
if (time_after(icsk->icsk_ack.timeout, jiffies)) {
sk_reset_timer(sk, &icsk->icsk_delack_timer, icsk->icsk_ack.timeout);
goto out;
}
icsk->icsk_ack.pending &= ~ICSK_ACK_TIMER;
if (!skb_queue_empty(&tp->ucopy.prequeue)) {//处理prequeue
struct sk_buff *skb;
__NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPSCHEDULERFAILED);
while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
sk_backlog_rcv(sk, skb);
tp->ucopy.memory = 0;
}
if (inet_csk_ack_scheduled(sk)) {//已经有ack被推迟过了
if (!icsk->icsk_ack.pingpong) { //不能延时,但却有ack被推迟了
/* Delayed ACK missed: inflate ATO. */
icsk->icsk_ack.ato = min(icsk->icsk_ack.ato << 1, icsk->icsk_rto); //double ato
} else {
/* Delayed ACK missed: leave pingpong mode and
* deflate ATO.
*/
//这一次延时ack完成了,重置
icsk->icsk_ack.pingpong = 0;
icsk->icsk_ack.ato = TCP_ATO_MIN;
}
tcp_send_ack(sk); //发送ack
__NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKS);
}
out:
if (tcp_under_memory_pressure(sk))
sk_mem_reclaim(sk);
}
static void tcp_delack_timer(unsigned long data)
{
struct sock *sk = (struct sock *)data;
bh_lock_sock(sk);
if (!sock_owned_by_user(sk)) {
tcp_delack_timer_handler(sk);
} else {
inet_csk(sk)->icsk_ack.blocked = 1;
__NET_INC_STATS(sock_net(sk), LINUX_MIB_DELAYEDACKLOCKED);
/* deleguate our work to tcp_release_cb() */
if (!test_and_set_bit(TCP_DELACK_TIMER_DEFERRED, &tcp_sk(sk)->tsq_flags)) //设置标记,等应用程序release_sock的时候调用tcp_delack_timer_handler
sock_hold(sk);
}
bh_unlock_sock(sk);
sock_put(sk);
}
void tcp_release_cb(struct sock *sk)
{
...
sock_release_ownership(sk);
...
if (flags & (1UL << TCP_DELACK_TIMER_DEFERRED)) {
tcp_delack_timer_handler(sk);
__sock_put(sk);
}
}

tcp_recvmsg

读完sk_receive_queue之后,如果没有读满应用程序缓存,则会处理prequeue和backlog队列,并从中读取。
另外,及时读满了缓存,也会在函数退出前,处理prequeue和backlog队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
int tcp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int nonblock,
int flags, int *addr_len)
{
...
lock_sock(sk); //设置sock_owned_by_user
...
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
do {
last = skb_peek_tail(&sk->sk_receive_queue);
skb_queue_walk(&sk->sk_receive_queue, skb) {
//先从sk_receive_queue读取数据
..
}
if (copied >= target && !sk->sk_backlog.tail) //得到需要的数据长度,且backlog中没有数据
break;
//非阻塞或者收到信号退出循环
...
//处理完sk_receive_queue,并判断是否需要发送ack,是则发送
tcp_cleanup_rbuf(sk, copied);
//阻塞调用, 尝试处理prequeue队列
if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) { //开启prequeue,没有设置ucopy进程,或者为当前进程
/* Install new reader */
if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
user_recv = current; //设置当前进程正在准备接受ucopy中的数据
tp->ucopy.task = user_recv;
tp->ucopy.msg = msg;
}
tp->ucopy.len = len; //设置msg中应用缓存的长度
WARN_ON(tp->copied_seq != tp->rcv_nxt &&
!(flags & (MSG_PEEK | MSG_TRUNC)));
if (!skb_queue_empty(&tp->ucopy.prequeue)) //处理prequeue
goto do_prequeue;
}
//处理完prequeue队列,开始处理backlog队列
if (copied >= target) { //达到要接收的字节数
/* Do not sleep, just process backlog. */
release_sock(sk); //处理完backlog后重新加锁
lock_sock(sk);
} else {
sk_wait_data(sk, &timeo, last); //没收到目标大小的数据,等待新数据到达sk_receive_queue。 等待前会调用release_sock处理backlog队列
}
//处理完backlog, 这时候如果是顺序数据包,ucopy中会被添加数据
if (user_recv) {
int chunk;
/* __ Restore normal policy in scheduler __ */
// 此时ucopy中的是backlog处理后的数据
chunk = len - tp->ucopy.len;
if (chunk != 0) {
NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
len -= chunk;
copied += chunk;
}
if (tp->rcv_nxt == tp->copied_seq &&
!skb_queue_empty(&tp->ucopy.prequeue)) { //因为之前处理backlog的时候,释放过锁,这时候prequeue中可能有新的包
do_prequeue:
tcp_prequeue_process(sk); //tcp_v4_do_rcv prequeue
chunk = len - tp->ucopy.len;
if (chunk != 0) {
NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
}
continue;
...
} while (len > 0);
//数据copy够了,再给一个prequeue处理机会
if (user_recv) {
if (!skb_queue_empty(&tp->ucopy.prequeue)) {
int chunk;
tp->ucopy.len = copied > 0 ? len : 0; //还有剩余空间则copy到ucopy,没有则放到sk_receive_queue中
tcp_prequeue_process(sk);
if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS(sock_net(sk), LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
tp->ucopy.task = NULL; //结束prequeue处理
tp->ucopy.len = 0;
}
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
release_sock(sk); //处理backlog,释放锁
return copied;
out:
release_sock(sk);
return err;
...
}

fast path处理

在快速路径中如果当前是应用程序上下文,且ucopy中还有剩余缓存,则直接copy到ucopy中。
否则通过tcp_queue_rcv函数存放到sk_receive_queue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
const struct tcphdr *th, unsigned int len)
{
if ((tcp_flag_word(th) & TCP_HP_BITS) == tp->pred_flags && // 快速路径包头检测
TCP_SKB_CB(skb)->seq == tp->rcv_nxt && // 非乱序包
!after(TCP_SKB_CB(skb)->ack_seq, tp->snd_nxt)) { // 确认的序号是已经发送的包
...
if (len <= tcp_header_len) { //包中没有数据
...
} else { //fast path带数据
int eaten = 0;
bool fragstolen = false;
if (tp->ucopy.task == current && //在应用程序上下文中,tcp_recvmsg,在处理prequeue或者backlog
tp->copied_seq == tp->rcv_nxt && //接收缓存没有数据了
len - tcp_header_len <= tp->ucopy.len && //应用程序缓存还有剩余
sock_owned_by_user(sk)) { //应用程序获得锁
__set_current_state(TASK_RUNNING);
if (!tcp_copy_to_iovec(sk, skb, tcp_header_len)) { //copy数据到ucopy中
/* Predicted packet is in window by definition.
* seq == rcv_nxt and rcv_wup <= rcv_nxt.
* Hence, check seq<=rcv_wup reduces to:
*/
if (tcp_header_len ==
(sizeof(struct tcphdr) +
TCPOLEN_TSTAMP_ALIGNED) &&
tp->rcv_nxt == tp->rcv_wup) //在收到这个数据包之前,没有发送包也没有收到其他数据包,并且这个包不是乱序包
tcp_store_ts_recent(tp);
tcp_rcv_rtt_measure_ts(sk, skb); //对大于等于mss的包更新rtt
__skb_pull(skb, tcp_header_len);
tcp_rcv_nxt_update(tp, TCP_SKB_CB(skb)->end_seq); //更新rcv_nxt
NET_INC_STATS(sock_net(sk),
LINUX_MIB_TCPHPHITSTOUSER);
eaten = 1;
}
}
if (!eaten) { //没有把数据放到ucopy中
if (tcp_checksum_complete(skb)) //校验
goto csum_error;
if ((int)skb->truesize > sk->sk_forward_alloc) // 超过当前的接收配额
goto step5; //进入slow path,但是可以跳过部分的头部检查
/* Predicted packet is in window by definition.
* seq == rcv_nxt and rcv_wup <= rcv_nxt.
* Hence, check seq<=rcv_wup reduces to:
*/
if (tcp_header_len ==
(sizeof(struct tcphdr) + TCPOLEN_TSTAMP_ALIGNED) &&
tp->rcv_nxt == tp->rcv_wup) //在收到这个数据包之前,没有发送包也没有收到其他数据包,并且这个包不是乱序包
tcp_store_ts_recent(tp);
tcp_rcv_rtt_measure_ts(sk, skb); //对大于等于mss的包更新rtt
NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPHPHITS);
/* Bulk data transfer: receiver */
eaten = tcp_queue_rcv(sk, skb, tcp_header_len, //添加数据到sk_receive_queue中
&fragstolen);
}
...
}
...
}
slow_path:
...
tcp_data_queue(sk, skb); //slow path数据处理
...
}

slow path处理

上面可以看到在slow path通过tcp_data_queue来处理数据
同样的在slow path如果收到是非乱序包,且在用户上下文,则尝试copy到ucopy中,否则通过tcp_queue_rcv函数存放到sk_receive_queue中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
...
/* Queue data for delivery to the user.
* Packets in sequence go to the receive queue.
* Out of sequence packets to the out_of_order_queue.
*/
if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) { //非乱序包
if (tcp_receive_window(tp) == 0) //接受窗口满了,不能接受
goto out_of_window;
/* Ok. In sequence. In window. */
if (tp->ucopy.task == current && //当前是应用程序上下文,正在tcp_recvmsg
tp->copied_seq == tp->rcv_nxt && tp->ucopy.len && //正要读取这个包,并且应用程序还有剩余缓存
sock_owned_by_user(sk) && !tp->urg_data) { //应用程序持有锁
int chunk = min_t(unsigned int, skb->len,
tp->ucopy.len);
__set_current_state(TASK_RUNNING);
if (!skb_copy_datagram_msg(skb, 0, tp->ucopy.msg, chunk)) { //copy数据到ucopy中
tp->ucopy.len -= chunk;
tp->copied_seq += chunk;
eaten = (chunk == skb->len);
tcp_rcv_space_adjust(sk); //每次copy数据到ucopy都要调用该函数,动态更新sk_rcvbuf大小
}
}
if (eaten <= 0) { //没有copy到ucopy,或者没有全部copy
queue_and_out:
if (eaten < 0) { //没有copy
if (skb_queue_len(&sk->sk_receive_queue) == 0) //sk_receive_queue没有数据
sk_forced_mem_schedule(sk, skb->truesize); //检查是否需要分配配额
else if (tcp_try_rmem_schedule(sk, skb, skb->truesize)) //查看接收缓存空间够不够
goto drop;
}
eaten = tcp_queue_rcv(sk, skb, 0, &fragstolen); //添加到sk_receive_queue
}
...
}
...
}

参考

http://blog.csdn.net/dog250/article/details/5464513