Linux RPS/RFS实现

概念

  • Why RPS(Receive Packet Steering)?
    考虑以下场景:
    一个只有一个接收队列的网卡,收到数据包后中断到一个cpu,这时候就会导致数据包被串行处理。不能利用多核。
    因此google提交了RPS的patch,在收到数据包后提交协议栈的时候(netif_receive_skb_internal),根据/sys/class/net//queues/rx-/rps_cpus的设置,
    把这个接收队列收到的数据包发送, 通过hash值分发到设置的cpu集合中。 hash(skb->hash)值一般由网卡根据数据包头部的四元组直接计算, 因此能够让同一个流在同一个cpu上处理

  • Why not irqbalance?
    我们知道通过设置/proc/irq//smp_affinity, 可以设置中断的亲和性。 在用户态也有irqbalance来根据系统各cpu的实时情况来设置亲和性,从而达到中断的负载均衡。
    但是irqbalance虽然能够利用多核计算特性, 但是显而易见的cache利用率非常低效。

  • Why RFS(Receive Flow Steering)
    RPS虽然能够利用多核特性,但是如果如果应用程序所在的CPU和RPS选择的CPU不是同一个的话,也会降低cache的利用。
    因此,RFS是RPS的一个扩展补丁包,在RPS的基础上,解决了以上问题。
    主要是在应用程序调用系统调用的时候,在一个全局的hash表上,用流的hash值,映射到当前的cpu上。
    在流的下一个数据包到来的时候,可以查这个全局的hash表。

  • OOO(out of order包)?
    当调度器调度应用程序到另一个cpu上的时候,根据RFS算法,数据包也要发送到这个新的cpu上,这时候过去的包在另一个cpu的软中断中处理。
    同一个流的数据包同时发送到两个不同cpu的队列中,就会导致ooo乱序包。
    因此RFS引入了另一个per rx队列的rps_flow_table, 具体实现在下文描述,总之如果cpu变更,且原来cpu中还有该flow的数据,就不会把数据包发送到新的cpu队列上。

另外在一些其他tcp协议栈中,因为cache优化,会有percpu的tcp establish表,这时候如果同一个流发送到不同的cpu就会导致错误。

sock_rps_record_flow

在应用程序调用系统调用的时候(inet_sendmsg(),inet_recvmsg(0,inet_accept()等), 会调用sock_rps_record_flow在全局的hash表rps_sock_flow_table中,
这个hash表的key为sk->sk_rxhash(skb->hash)映射到数组后的索引值, value为无符号32位整数,后半部分rps_cpu_mask为当前所在的cpu,前面部分为hash值的对应部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static inline void sock_rps_record_flow(const struct sock *sk)
{
#ifdef CONFIG_RPS
sock_rps_record_flow_hash(sk->sk_rxhash);
#endif
}
static inline void sock_rps_record_flow_hash(__u32 hash)
{
#ifdef CONFIG_RPS
struct rps_sock_flow_table *sock_flow_table;
rcu_read_lock();
sock_flow_table = rcu_dereference(rps_sock_flow_table);
rps_record_sock_flow(sock_flow_table, hash);
rcu_read_unlock();
#endif
}
static inline void rps_record_sock_flow(struct rps_sock_flow_table *table,
u32 hash)
{
if (table && hash) {
unsigned int index = hash & table->mask; //hash映射到数组内
u32 val = hash & ~rps_cpu_mask; //hash分为两部分,后半部分rps_cpu_mask部分表示cpu,前半部分表示hash
/* We only give a hint, preemption can change CPU under us */
val |= raw_smp_processor_id(); //hash | cpu 两部分组成
if (table->ents[index] != val)
table->ents[index] = val;
}
}

rps_sock_flow_table表

rps_sock_flow_table中hash表的大小由/proc/sys/net/core/rps_sock_flow_entries配置
配置的时候调用rps_sock_flow_sysctl函数来分配空间,并根据rps_cpu_mask为nr_cpu_ids的2的指数对齐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/*
* The rps_sock_flow_table contains mappings of flows to the last CPU
* on which they were processed by the application (set in recvmsg).
* Each entry is a 32bit value. Upper part is the high-order bits
* of flow hash, lower part is CPU number.
* rps_cpu_mask is used to partition the space, depending on number of
* possible CPUs : rps_cpu_mask = roundup_pow_of_two(nr_cpu_ids) - 1
* For example, if 64 CPUs are possible, rps_cpu_mask = 0x3f,
* meaning we use 32-6=26 bits for the hash.
*/
struct rps_sock_flow_table {
u32 mask;
u32 ents[0] ____cacheline_aligned_in_smp;
};
static int rps_sock_flow_sysctl(struct ctl_table *table, int write,
void __user *buffer, size_t *lenp, loff_t *ppos)
{
unsigned int orig_size, size;
int ret, i;
struct ctl_table tmp = {
.data = &size,
.maxlen = sizeof(size),
.mode = table->mode
};
struct rps_sock_flow_table *orig_sock_table, *sock_table;
static DEFINE_MUTEX(sock_flow_mutex);
mutex_lock(&sock_flow_mutex);
orig_sock_table = rcu_dereference_protected(rps_sock_flow_table,
lockdep_is_held(&sock_flow_mutex));
size = orig_size = orig_sock_table ? orig_sock_table->mask + 1 : 0;
ret = proc_dointvec(&tmp, write, buffer, lenp, ppos);
if (write) {
if (size) {
if (size > 1<<29) {
/* Enforce limit to prevent overflow */
mutex_unlock(&sock_flow_mutex);
return -EINVAL;
}
size = roundup_pow_of_two(size);
if (size != orig_size) {
sock_table =
vmalloc(RPS_SOCK_FLOW_TABLE_SIZE(size));
if (!sock_table) {
mutex_unlock(&sock_flow_mutex);
return -ENOMEM;
}
rps_cpu_mask = roundup_pow_of_two(nr_cpu_ids) - 1; //hash分为两部分,后半部分为所在cpu
sock_table->mask = size - 1;
} else
sock_table = orig_sock_table;
for (i = 0; i < size; i++)
sock_table->ents[i] = RPS_NO_CPU;
} else
sock_table = NULL;
if (sock_table != orig_sock_table) {
rcu_assign_pointer(rps_sock_flow_table, sock_table);
if (sock_table)
static_key_slow_inc(&rps_needed);
if (orig_sock_table) {
static_key_slow_dec(&rps_needed);
synchronize_rcu();
vfree(orig_sock_table);
}
}
}
mutex_unlock(&sock_flow_mutex);
return ret;
}

netif_receive_skb_internal

在netif_receive_skb_internal()中,调用get_rps_cpu(), 根据RPS/RFS算法来选择最佳cpu, 并把数据包放到这个cpu上去处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int netif_receive_skb_internal(struct sk_buff *skb)
{
int ret;
net_timestamp_check(netdev_tstamp_prequeue, skb);
if (skb_defer_rx_timestamp(skb))
return NET_RX_SUCCESS;
rcu_read_lock();
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu = get_rps_cpu(skb->dev, skb, &rflow); //根据RPS/RFS算法选择cpu
if (cpu >= 0) {
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); //添加数据包到选出的cpu的sd中,然后在当前cpu的软中断中通过IPI的方式通知对方cpu
rcu_read_unlock();
return ret;
}
}``
#endif
ret = __netif_receive_skb(skb); //不使用RPS,则直接协议栈处理
rcu_read_unlock();
return ret;
}

get_rps_cpu

get_rps_cpu()就是RPS/RFS选择cpu来处理数据的核心。
在配置了RPS/RFS的情况下

  • 先查看应用程序所在的cpu: next_cpu
  • 如果rps_sock_flow_table hash碰撞了, 使用rps_cpus通过hash选择cpu
  • 再判断应用程序是否有被调度到其他cpu上,如果是则判断之前cpu上该流的数据包是否都已经被处理完了
  • 如果之前cpu上的数据都已经处理完了,则把数据发送到新的cpu上,并更新rps_flow_table
  • 如果之前cpu上的数据还没有处理完,则把数据发送到旧的cpu上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
const struct rps_sock_flow_table *sock_flow_table;
struct netdev_rx_queue *rxqueue = dev->_rx;
struct rps_dev_flow_table *flow_table;
struct rps_map *map;
int cpu = -1;
u32 tcpu;
u32 hash;
if (skb_rx_queue_recorded(skb)) { //有记录接收队列
u16 index = skb_get_rx_queue(skb);
if (unlikely(index >= dev->real_num_rx_queues)) {
WARN_ONCE(dev->real_num_rx_queues > 1,
"%s received packet on queue %u, but number "
"of RX queues is %u\n",
dev->name, index, dev->real_num_rx_queues);
goto done;
}
rxqueue += index; //得到这个接收队列的netdev_rx_queue
}
/* Avoid computing hash if RFS/RPS is not active for this rxqueue */
flow_table = rcu_dereference(rxqueue->rps_flow_table); //每个接收队列上的rps_flow_table
map = rcu_dereference(rxqueue->rps_map); //当前接收队列设置了能够处理的cpu
if (!flow_table && !map) //没有配置,则不使用rfs/rps
goto done;
skb_reset_network_header(skb);
hash = skb_get_hash(skb);
if (!hash)
goto done;
sock_flow_table = rcu_dereference(rps_sock_flow_table);
if (flow_table && sock_flow_table) {
struct rps_dev_flow *rflow;
u32 next_cpu;
u32 ident;
/* First check into global flow table if there is a match */
ident = sock_flow_table->ents[hash & sock_flow_table->mask];
if ((ident ^ hash) & ~rps_cpu_mask) //ident中的hash部分和当前skb hash不一致, 说明两个不同的hash,hash到相同的ent里,产生碰撞
goto try_rps; //碰撞的话重新根据rps选择cpu
next_cpu = ident & rps_cpu_mask; //获取RFS算法保存的cpu
/* OK, now we know there is a match,
* we can look at the local (per receive queue) flow table
*/
rflow = &flow_table->flows[hash & flow_table->mask];
tcpu = rflow->cpu; //初始化的时候设置为RPS_NO_CPU
/*
* If the desired CPU (where last recvmsg was done) is
* different from current CPU (one in the rx-queue flow
* table entry), switch if one of the following holds:
* - Current CPU is unset (>= nr_cpu_ids).
* - Current CPU is offline.
* - The current CPU's queue tail has advanced beyond the
* last packet that was enqueued using this table entry.
* This guarantees that all previous packets for the flow
* have been dequeued, thus preserving in order delivery.
*/
if (unlikely(tcpu != next_cpu) && //rps_flow_table中的cpu和应用程序所在的cpu不一致, 说明应用程序发生了调度
(tcpu >= nr_cpu_ids || !cpu_online(tcpu) || //rps_flow_table中的cpu无效,或者该cpu下线了
((int)(per_cpu(softnet_data, tcpu).input_queue_head -
rflow->last_qtail)) >= 0)) { //旧的cpu, tcpu上的数据包都已经处理完了
tcpu = next_cpu; //因此可以发送数据包到新的cpu上
rflow = set_rps_cpu(dev, skb, rflow, next_cpu); //设置rps_flow_table的cpu为应用程序新的cpu
}
if (tcpu < nr_cpu_ids && cpu_online(tcpu)) {
*rflowp = rflow;
cpu = tcpu; //如果应用程序发生调度,且旧的cpu上的数据没处理完,则返回旧的cpu
goto done;
}
}
try_rps:
if (map) { //根据per rx队列的rps_cpus配置,选择cpu
tcpu = map->cpus[reciprocal_scale(hash, map->len)]; //根据hash算法,选择一个rps cpu
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}
done:
return cpu;
}

rps_flow_table & ooo

在上面代码中,我们看到增加了rps_flow_table这个per rx队列的表,来处理ooo的问题。
主要通过rps_dev_flow中的last_qtail, 及softnet_data中的input_queue_head和input_queue_tail共同标记了一个流在当前cpu队列上的生存周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
struct netdev_rx_queue {
#ifdef CONFIG_RPS
struct rps_map __rcu *rps_map;
struct rps_dev_flow_table __rcu *rps_flow_table;
#endif
struct kobject kobj;
struct net_device *dev;
} ____cacheline_aligned_in_smp;
/*
* This structure holds an RPS map which can be of variable length. The
* map is an array of CPUs.
*/
struct rps_map { //rps_cpus配置
unsigned int len;
struct rcu_head rcu;
u16 cpus[0];
};
/*
* The rps_dev_flow structure contains the mapping of a flow to a CPU, the
* tail pointer for that CPU's input queue at the time of last enqueue, and
* a hardware filter index.
*/
struct rps_dev_flow {
u16 cpu;
u16 filter;
unsigned int last_qtail; //当前flow,上一次添加skb到input_pkt_queue时,input_queue_tail的值
};
/*
* The rps_dev_flow_table structure contains a table of flow mappings.
*/
struct rps_dev_flow_table {
unsigned int mask;
struct rcu_head rcu;
struct rps_dev_flow flows[0];
};
struct softnet_data {
struct list_head poll_list;
struct sk_buff_head process_queue;
/* stats */
unsigned int processed;
unsigned int time_squeeze;
unsigned int received_rps;
#ifdef CONFIG_RPS
struct softnet_data *rps_ipi_list;
#endif
#ifdef CONFIG_NET_FLOW_LIMIT
struct sd_flow_limit __rcu *flow_limit;
#endif
struct Qdisc *output_queue;
struct Qdisc **output_queue_tailp;
struct sk_buff *completion_queue;
#ifdef CONFIG_RPS
/* input_queue_head should be written by cpu owning this struct,
* and only read by other cpus. Worth using a cache line.
*/
unsigned int input_queue_head ____cacheline_aligned_in_smp; //input_pkt_queue包dequeue被处理的时候加1
/* Elements below can be accessed between CPUs for RPS/RFS */
struct call_single_data csd ____cacheline_aligned_in_smp;
struct softnet_data *rps_ipi_next;
unsigned int cpu;
unsigned int input_queue_tail; //每次添加到input_pkt_queue的时候加1
#endif
unsigned int dropped;
struct sk_buff_head input_pkt_queue;
struct napi_struct backlog;
};

enqueue_to_backlog & process_backlog

在get_rps_cpu()选择好cpu后,在开启RPS的情况下,会调用enqueue_to_backlog()把数据包排入该cpu softnet_data的input_pkt_queue中。
排入input_pkt_queue后,会添加sd->backlog作为napi被调度,然后该napi的poll函数为process_backlog(), 该函数从input_pkt_queue中dequeue skb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
sd = &per_cpu(softnet_data, cpu); //rps决定要添加到cpu的softnet_data上
local_irq_save(flags);
rps_lock(sd);
if (!netif_running(skb->dev))
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
if (qlen) {
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb); //添加到input_pkt_queue
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
// rps决定的cpu input_pkt_queue,收到第一个包,需要调度对方cpu的napi执行,通过ipi的方式
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {// backlog还没调度
if (!rps_ipi_queued(sd)) //rps决定的cpu不是本地cpu, 把sd添加到本地sd的rps_ipi_list链表中, 并设置本地软中断
____napi_schedule(sd, &sd->backlog); //sd是本地cpu上的,直接____napi_schedule调度napi
}
goto enqueue;
}
drop:
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}
static int process_backlog(struct napi_struct *napi, int quota)
{
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
bool again = true;
int work = 0;
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) { //有数据包发送到其他cpu上,需要唤醒其他cpu的napi
local_irq_disable();
net_rps_action_and_irq_enable(sd); //通过ipi的方式,让其他cpu开始处理数据包
}
napi->weight = weight_p;
while (again) {
struct sk_buff *skb;
while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
__netif_receive_skb(skb);
rcu_read_unlock();
input_queue_head_incr(sd); //每次dequeue增加input_queue_head, 当input_queue_head=input_queue_tail的时候,说明数据包被处理完了
if (++work >= quota)
return work;
}
local_irq_disable();
rps_lock(sd); //因为rps算法可以为其他cpu的input_pkt_queue插入数据包,所以要加锁
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
again = false;
} else {
skb_queue_splice_tail_init(&sd->input_pkt_queue, //把input_pkt_queue移动到process_queue上
&sd->process_queue); //使用process_queue是为了减小rps锁的粒度
}
rps_unlock(sd);
local_irq_enable();
}
return work;
}

input_queue_tail_incr_save & input_queue_head_incr

每次插入input_pkt_queue中的时候input_queue_tail加1, 并赋值给get_rps_cpu返回的rps_dev_flow中的last_qtail。
这样last_qtail就表示该flow数据包进入input_pkt_queue时的input_queue_tail值。
当从input_pkt_queue/process_queue中dequeue的时候,input_queue_head加1.
意味着input_queue_head==input_queue_tail的时候表示队列中没有数据了。
而input_queue_head>=last_qtail的时候,则表示该flow的所有数据都已经被处理, 以此来解决前述的OOO问题

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void input_queue_tail_incr_save(struct softnet_data *sd,
unsigned int *qtail)
{
#ifdef CONFIG_RPS
*qtail = ++sd->input_queue_tail; //每次enqueue的时候加1
#endif
}
static inline void input_queue_head_incr(struct softnet_data *sd)
{
#ifdef CONFIG_RPS
sd->input_queue_head++;
#endif
}

net_rx_action

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
static __latent_entropy void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);
local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
for (;;) {
struct napi_struct *n;
if (list_empty(&list)) { //本地cpu上没有napi要调度
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll)) //判断是否有其他cpu上的包,通过rps的方式发到当前cpu上
return; //没有则返回
break; //有则退出循环,在net_rps_action_and_irq_enable中处理这些数据包
}
n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); // sd->backlog napi会调用process_queue()
/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
if (unlikely(budget <= 0 ||
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}
__kfree_skb_flush();
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if (!list_empty(&sd->poll_list))
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
net_rps_action_and_irq_enable(sd);
}
static void net_rps_action_and_irq_enable(struct softnet_data *sd)
{
#ifdef CONFIG_RPS
struct softnet_data *remsd = sd->rps_ipi_list;
if (remsd) { //有其他cpu上的包通过rps发送到当前cpu上
sd->rps_ipi_list = NULL;
local_irq_enable();
/* Send pending IPI's to kick RPS processing on remote cpus. */
while (remsd) { //通过ipi的方式,在其他cpu上设置软中断
struct softnet_data *next = remsd->rps_ipi_next;
if (cpu_online(remsd->cpu))
smp_call_function_single_async(remsd->cpu,
&remsd->csd); //在remsd->cpu上执行rps_trigger_softirq
remsd = next;
}
} else
#endif
local_irq_enable();
}
/* Called from hardirq (IPI) context */
static void rps_trigger_softirq(void *data)
{
struct softnet_data *sd = data;
____napi_schedule(sd, &sd->backlog);
sd->received_rps++;
}

配置

1
2
3
4
5
6
7
8
// 配置rx队列可以映射的cpu
/sys/class/net/<dev>/queues/rx-<n>/rps_cpus
// 应用程序系统调用记录的连接所在cpu的表的大小
/proc/sys/net/core/rps_sock_flow_entries
// 每个rx队列记录的flow table的大小, 通常如果rps_sock_flow_entries=32768, rps_flow_cnt可以等于32768/N
/sys/class/net/<dev>/queues/rx-<n>/rps_flow_cnt

参考资料

rps: Receive Packet Steering
rfs: Receive Flow Steering
Documentation/networking/scaling.txt