linux GRO实现

概念

GRO(Generic receive offload): 在napi poll里把小包封装成大包再递交给协议栈
LRO: GRO的硬件实现(通过网卡的RSC功能)

http://lwn.net/Articles/358910/

Why GRO? 什么时候应该开启?

  • 组合大包再提交给协议栈, 可以节省协议栈路径中的资源的使用,比如sock锁的获取,减少事件的通知,从而减少应用程序的系统调用,等等
  • 因为延时提交协议栈,所以交互类应用程序,尤其是数据包不大的应用,应该关闭GRO
  • 对于无线等网络抖动,乱序度高的网络场景, 开启GRO可以避免因为乱序到达而导致的快速ack。 如不开启,会导致发送方的拥塞算法做某些误判,从而导致tcp网络利用率大幅下降。

napi_poll

对于ixgbe驱动,当收到请求后,调用中断处理函数ixgbe_msix_clean_rings(), 标记软中断NET_RX_SOFTIRQ。
在net_rx_action中主要调用napi_poll()处理.

napi_poll()主要调用ixgbe_poll(), 从网卡中接收数据包skb, 然后执行GRO合并或者提交协议栈。
如果ixgbe_poll()用完配额,说明io busy,马上把旧的GRO数据包提交给协议栈, 然后不用等中断通知,准备再次poll这个napi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
void *have;
int work, weight;
list_del_init(&n->poll_list);
have = netpoll_poll_lock(n);
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight); //ixgbe_poll
trace_napi_poll(n, work, weight);
}
WARN_ON_ONCE(work > weight);
if (likely(work < weight)) //配额没用完,返回
goto out_unlock;
// 如果驱动消耗了全部的配额,flush gro, 并准备下次poll
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(napi_disable_pending(n))) {
napi_complete(n);
goto out_unlock;
}
if (n->gro_list) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
napi_gro_flush(n, HZ >= 1000); //把旧的包提交给协议栈
}
/* Some drivers may have called napi_schedule
* prior to exhausting their budget.
*/
if (unlikely(!list_empty(&n->poll_list))) {
pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
n->dev ? n->dev->name : "backlog");
goto out_unlock;
}
list_add_tail(&n->poll_list, repoll); //驱动消耗了全部配额,不需要中断通知了,准备下次接收
out_unlock:
netpoll_poll_unlock(have);
return work;
}

ixgbe_poll

ixgbe_poll()主要调用ixgbe_clean_rx_irq()从中断向量关联的接收队列中,读取skb包, 并执行GRO或者提交协议栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* ixgbe_poll - NAPI Rx polling callback
* @napi: structure for representing this polling device
* @budget: how many packets driver is allowed to clean
*
* This function is used for legacy and MSI, NAPI mode
**/
int ixgbe_poll(struct napi_struct *napi, int budget)
{
struct ixgbe_q_vector *q_vector =
container_of(napi, struct ixgbe_q_vector, napi);
struct ixgbe_adapter *adapter = q_vector->adapter;
struct ixgbe_ring *ring;
int per_ring_budget, work_done = 0;
bool clean_complete = true;
... //发送完成资源的回收处理
/* attempt to distribute budget to each queue fairly, but don't allow
* the budget to go below 1 because we'll exit polling */
if (q_vector->rx.count > 1)
per_ring_budget = max(budget/q_vector->rx.count, 1);
else
per_ring_budget = budget;
ixgbe_for_each_ring(ring, q_vector->rx) {
int cleaned = ixgbe_clean_rx_irq(q_vector, ring,
per_ring_budget);
work_done += cleaned; //接收的包数量
if (cleaned >= per_ring_budget)
clean_complete = false; //有接收队列达到或超过配额
}
ixgbe_qv_unlock_napi(q_vector); //全部gro_list提交协议栈
/* If all work not completed, return budget and keep polling */
if (!clean_complete)
return budget;
/* all work done, exit the polling mode */ //所有队列都没有用完配额,说明暂时不用再次poll了
napi_complete_done(napi, work_done); // napi完成,从poll_list中删除
if (adapter->rx_itr_setting & 1)
ixgbe_set_itr(q_vector);
if (!test_bit(__IXGBE_DOWN, &adapter->state))
ixgbe_irq_enable_queues(adapter, BIT_ULL(q_vector->v_idx));
return min(work_done, budget - 1);
}
static inline void ixgbe_qv_unlock_napi(struct ixgbe_q_vector *q_vector)
{
WARN_ON(atomic_read(&q_vector->state) != IXGBE_QV_STATE_NAPI);
/* flush any outstanding Rx frames */
if (q_vector->napi.gro_list)
napi_gro_flush(&q_vector->napi, false); //全部提交协议栈
/* reset state to idle */
atomic_set(&q_vector->state, IXGBE_QV_STATE_IDLE);
}
//完成napi,从poll_list中删除
void napi_complete_done(struct napi_struct *n, int work_done)
{
unsigned long flags;
/*
* don't let napi dequeue from the cpu poll list
* just in case its running on a different cpu
*/
if (unlikely(test_bit(NAPI_STATE_NPSVC, &n->state)))
return;
if (n->gro_list) { //在ixgbe_qv_unlock_napi中已经全部提交了
unsigned long timeout = 0;
if (work_done)
timeout = n->dev->gro_flush_timeout;
if (timeout)
hrtimer_start(&n->timer, ns_to_ktime(timeout), //设置定时器,napi_watchdog中判断如果有gro_list处理,则发起软中断
HRTIMER_MODE_REL_PINNED);
else
napi_gro_flush(n, false); //全部提交协议栈
}
if (likely(list_empty(&n->poll_list))) {
WARN_ON_ONCE(!test_and_clear_bit(NAPI_STATE_SCHED, &n->state));
} else {
/* If n->poll_list is not empty, we need to mask irqs */
local_irq_save(flags);
__napi_complete(n); //从poll_list中删除
local_irq_restore(flags);
}
}

ixgbe_clean_rx_irq

ixgbe_clean_rx_irq()函数从网卡接收队列中读取skb后,调用ixgbe_rx_skb()来执行GRO或者提交给协议栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* ixgbe_clean_rx_irq - Clean completed descriptors from Rx ring - bounce buf
* @q_vector: structure containing interrupt and ring information
* @rx_ring: rx descriptor ring to transact packets on
* @budget: Total limit on number of packets to process
*
* This function provides a "bounce buffer" approach to Rx interrupt
* processing. The advantage to this is that on systems that have
* expensive overhead for IOMMU access this provides a means of avoiding
* it by maintaining the mapping of the page to the syste.
*
* Returns amount of work completed
**/
static int ixgbe_clean_rx_irq(struct ixgbe_q_vector *q_vector,
struct ixgbe_ring *rx_ring,
const int budget)
{
unsigned int total_rx_bytes = 0, total_rx_packets = 0;
u16 cleaned_count = ixgbe_desc_unused(rx_ring);
while (likely(total_rx_packets < budget)) {
union ixgbe_adv_rx_desc *rx_desc;
struct sk_buff *skb;
/* return some buffers to hardware, one at a time is too slow */
if (cleaned_count >= IXGBE_RX_BUFFER_WRITE) {
ixgbe_alloc_rx_buffers(rx_ring, cleaned_count);
cleaned_count = 0;
}
rx_desc = IXGBE_RX_DESC(rx_ring, rx_ring->next_to_clean);
if (!rx_desc->wb.upper.status_error)
break;
/* This memory barrier is needed to keep us from reading
* any other fields out of the rx_desc until we know the
* descriptor has been written back
*/
dma_rmb();
/* retrieve a buffer from the ring */
skb = ixgbe_fetch_rx_buffer(rx_ring, rx_desc); //获取skb
/* exit if we failed to retrieve a buffer */
if (!skb)
break;
cleaned_count++;
/* place incomplete frames back on ring for completion */
if (ixgbe_is_non_eop(rx_ring, rx_desc, skb))
continue;
/* verify the packet layout is correct */
if (ixgbe_cleanup_headers(rx_ring, rx_desc, skb))
continue;
/* probably a little skewed due to removing CRC */
total_rx_bytes += skb->len;
/* populate checksum, timestamp, VLAN, and protocol */
ixgbe_process_skb_fields(rx_ring, rx_desc, skb);
ixgbe_rx_skb(q_vector, skb); //执行GRO,或者提交协议栈
/* update budget accounting */
total_rx_packets++;
}
return total_rx_packets;
}

ixgbe_rx_skb & napi_gro_receive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,
struct sk_buff *skb)
{
skb_mark_napi_id(skb, &q_vector->napi);
if (ixgbe_qv_busy_polling(q_vector)) //开启busy polling的情况
netif_receive_skb(skb); //poll的话直接提交给协议栈
else
napi_gro_receive(&q_vector->napi, skb); //默认入口, GRO处理
}
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
skb_mark_napi_id(skb, napi);
skb_gro_reset_offset(skb);
return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}
static void skb_gro_reset_offset(struct sk_buff *skb)
{
const struct skb_shared_info *pinfo = skb_shinfo(skb);
const skb_frag_t *frag0 = &pinfo->frags[0];
NAPI_GRO_CB(skb)->data_offset = 0;
NAPI_GRO_CB(skb)->frag0 = NULL;
NAPI_GRO_CB(skb)->frag0_len = 0;
if (skb_mac_header(skb) == skb_tail_pointer(skb) && //说明包头也保存在frags中,支持sg的情况
pinfo->nr_frags &&
!PageHighMem(skb_frag_page(frag0))) {
NAPI_GRO_CB(skb)->frag0 = skb_frag_address(frag0);
NAPI_GRO_CB(skb)->frag0_len = skb_frag_size(frag0);
}
}

napi_gro_cb

skb->cb在不同的层次有不同的语义,在当前层次被解释为napi_gro_cb结构,调用skb_gro_reset_offset对其初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
struct napi_gro_cb {
/* Virtual address of skb_shinfo(skb)->frags[0].page + offset. */
void *frag0;
/* Length of frag0. */
unsigned int frag0_len;
/* This indicates where we are processing relative to skb->data. */
int data_offset;
/* This is non-zero if the packet cannot be merged with the new skb. */
u16 flush;
/* Save the IP ID here and check when we get to the transport layer */
u16 flush_id;
/* Number of segments aggregated. */
u16 count;
/* Start offset for remote checksum offload */
u16 gro_remcsum_start;
/* jiffies when first packet was created/queued */
unsigned long age;
/* Used in ipv6_gro_receive() and foo-over-udp */
u16 proto;
/* This is non-zero if the packet may be of the same flow. */
u8 same_flow:1;
/* Used in tunnel GRO receive */
u8 encap_mark:1;
/* GRO checksum is valid */
u8 csum_valid:1;
/* Number of checksums via CHECKSUM_UNNECESSARY */
u8 csum_cnt:3;
/* Free the skb? */
u8 free:2;
#define NAPI_GRO_FREE 1
#define NAPI_GRO_FREE_STOLEN_HEAD 2
/* Used in foo-over-udp, set in udp[46]_gro_receive */
u8 is_ipv6:1;
/* Used in GRE, set in fou/gue_gro_receive */
u8 is_fou:1;
/* 6 bit hole */
/* used to support CHECKSUM_COMPLETE for tunneling protocols */
__wsum csum;
/* used in skb_gro_receive() slow path */
struct sk_buff *last;
};

napi_skb_finish

napi_skb_finish根据dev_gro_receive的结果来判断GRO是否成功, 是否要让这个skb走协议栈,是否已经合并完成并能够释放这个skb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
{
switch (ret) {
case GRO_NORMAL: //走协议栈
if (netif_receive_skb_internal(skb))
ret = GRO_DROP;
break;
case GRO_DROP: //直接丢弃
kfree_skb(skb);
break;
case GRO_MERGED_FREE: //数据部分已经gro merge,可以释放skb
if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD) { //head区及frag区page都已经移动到gro中
skb_dst_drop(skb);
kmem_cache_free(skbuff_head_cache, skb); //直接回收
} else {
__kfree_skb(skb); //需要完整释放空间,包括head区
}
break;
case GRO_HELD: //gro流的第一个包
case GRO_MERGED://被插入gro流的last位置
break; //等待后续包的merge
}
return ret;
}

dev_gro_receive

dev_gro_receive是GRO的入口函数,会从L2-L4各层次去判断,当前skb能不能被GRO合并
gro_list_prepare先根据mac头来判断,标记same_flow
然后调用inet_gro_receive进入L3-L4层, 尝试GRO并返回后,如果是flow的第一个包则加入gro_list, 或是根据返回值来递交给协议栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
struct sk_buff **pp = NULL;
struct packet_offload *ptype;
__be16 type = skb->protocol;
struct list_head *head = &offload_base;
int same_flow;
enum gro_result ret;
int grow;
if (!(skb->dev->features & NETIF_F_GRO))
goto normal;
if (skb_is_gso(skb) || skb_has_frag_list(skb) || skb->csum_bad)
goto normal;
gro_list_prepare(napi, skb); //遍历napi->gro_list中所有的skb,判断是否是同一个流,是则设置same_flow
rcu_read_lock();
list_for_each_entry_rcu(ptype, head, list) {
if (ptype->type != type || !ptype->callbacks.gro_receive)
continue;
skb_set_network_header(skb, skb_gro_offset(skb));
skb_reset_mac_len(skb);
NAPI_GRO_CB(skb)->same_flow = 0;
NAPI_GRO_CB(skb)->flush = 0;
NAPI_GRO_CB(skb)->free = 0;
NAPI_GRO_CB(skb)->encap_mark = 0;
NAPI_GRO_CB(skb)->recursion_counter = 0;
NAPI_GRO_CB(skb)->is_fou = 0;
NAPI_GRO_CB(skb)->is_atomic = 1; //
NAPI_GRO_CB(skb)->gro_remcsum_start = 0;
/* Setup for GRO checksum validation */
switch (skb->ip_summed) {
case CHECKSUM_COMPLETE:
NAPI_GRO_CB(skb)->csum = skb->csum;
NAPI_GRO_CB(skb)->csum_valid = 1;
NAPI_GRO_CB(skb)->csum_cnt = 0;
break;
case CHECKSUM_UNNECESSARY:
NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;
NAPI_GRO_CB(skb)->csum_valid = 0;
break;
default:
NAPI_GRO_CB(skb)->csum_cnt = 0;
NAPI_GRO_CB(skb)->csum_valid = 0;
}
pp = ptype->callbacks.gro_receive(&napi->gro_list, skb); //inet_gro_receive
break;
}
rcu_read_unlock();
if (&ptype->list == head)
goto normal;
same_flow = NAPI_GRO_CB(skb)->same_flow;
ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED; //GRO_MERGED_FREE表示skb已经被merge,可以被释放了; GRO_MERGED表示skb已经释放包头,并被放到gro流的last位置
if (pp) { //skb所在的flow需要flush
struct sk_buff *nskb = *pp;
*pp = nskb->next;
nskb->next = NULL;
napi_gro_complete(nskb); //提交协议栈
napi->gro_count--;
}
if (same_flow) //gro成功,返回GRO_MERGED_FREE
goto ok;
if (NAPI_GRO_CB(skb)->flush)
goto normal; //返回GRO_NORMAL,走协议栈
//gro的时候没有合并,skb是flow的第一个包
if (unlikely(napi->gro_count >= MAX_GRO_SKBS)) { //超过最大gro流的数量,只支持8个
struct sk_buff *nskb = napi->gro_list;
/* locate the end of the list to select the 'oldest' flow */
while (nskb->next) { //取出队列尾部最老的flow
pp = &nskb->next;
nskb = *pp;
}
*pp = NULL;
nskb->next = NULL;
napi_gro_complete(nskb); //把nskb提交给协议栈
} else {
napi->gro_count++;
}
NAPI_GRO_CB(skb)->count = 1;
NAPI_GRO_CB(skb)->age = jiffies;
NAPI_GRO_CB(skb)->last = skb;
skb_shinfo(skb)->gso_size = skb_gro_len(skb);
skb->next = napi->gro_list;
napi->gro_list = skb; //插入gro_list队列头
ret = GRO_HELD; //被gro保存,但是没有合并
pull:
grow = skb_gro_offset(skb) - skb_headlen(skb);
if (grow > 0)
gro_pull_from_frag0(skb, grow); //把在frag中的包头部分copy到head区域
ok:
return ret;
normal:
ret = GRO_NORMAL;
goto pull;
}
static void gro_list_prepare(struct napi_struct *napi, struct sk_buff *skb)
{
struct sk_buff *p;
unsigned int maclen = skb->dev->hard_header_len;
u32 hash = skb_get_hash_raw(skb);
for (p = napi->gro_list; p; p = p->next) {
unsigned long diffs;
NAPI_GRO_CB(p)->flush = 0;
if (hash != skb_get_hash_raw(p)) {
NAPI_GRO_CB(p)->same_flow = 0;
continue;
}
diffs = (unsigned long)p->dev ^ (unsigned long)skb->dev;
diffs |= p->vlan_tci ^ skb->vlan_tci;
diffs |= skb_metadata_dst_cmp(p, skb);
if (maclen == ETH_HLEN)
diffs |= compare_ether_header(skb_mac_header(p),
skb_mac_header(skb));
else if (!diffs)
diffs = memcmp(skb_mac_header(p),
skb_mac_header(skb),
maclen);
NAPI_GRO_CB(p)->same_flow = !diffs; //二层包头判断
}
}

inet_gro_receive

在dev_gro_receive中已经设置好gro_list的same_flow,inet_gro_receive主要根据ip头来标记flush, 只要same_flow且不需要flush的包才能gro
对于ip头,都带DF或者都不带DF的包才能GRO, 不带DF标记的包ip id必须每个包增1; 并且支持fixed id(ip id相同的包)做GRO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
struct sk_buff **inet_gro_receive(struct sk_buff **head, struct sk_buff *skb)
{
const struct net_offload *ops;
struct sk_buff **pp = NULL;
struct sk_buff *p;
const struct iphdr *iph;
unsigned int hlen;
unsigned int off;
unsigned int id;
int flush = 1;
int proto;
off = skb_gro_offset(skb);
hlen = off + sizeof(*iph);
iph = skb_gro_header_fast(skb, off);
if (skb_gro_header_hard(skb, hlen)) { //包头可能在head区域
iph = skb_gro_header_slow(skb, hlen, off);
if (unlikely(!iph)) //如果包头不在head区域,则包有问题
goto out;
}
proto = iph->protocol;
rcu_read_lock();
ops = rcu_dereference(inet_offloads[proto]);
if (!ops || !ops->callbacks.gro_receive)
goto out_unlock;
if (*(u8 *)iph != 0x45) // ipv4,且不带高级选项
goto out_unlock;
if (unlikely(ip_fast_csum((u8 *)iph, 5)))
goto out_unlock;
id = ntohl(*(__be32 *)&iph->id); //有数据在head区域,或者包含MF标记,或者偏移非0, 则不使用GRO
flush = (u16)((ntohl(*(__be32 *)iph) ^ skb_gro_len(skb)) | (id & ~IP_DF)); //只要都带或都不带DF,都能GRO,因为有的server使用IP_MTU_DISCOVER/IP_PMTUDISC_PROBE
id >>= 16;
for (p = *head; p; p = p->next) { //遍历gro_list
struct iphdr *iph2;
u16 flush_id;
if (!NAPI_GRO_CB(p)->same_flow) //same_flow已经在gro_list_prepare中标记,只处理相同的flow
continue;
iph2 = (struct iphdr *)(p->data + off);
/* The above works because, with the exception of the top
* (inner most) layer, we only aggregate pkts with the same
* hdr length so all the hdrs we'll need to verify will start
* at the same offset.
*/
if ((iph->protocol ^ iph2->protocol) | //只合并所有相同offset的skb,根据offset获得iph2后再检查一下
((__force u32)iph->saddr ^ (__force u32)iph2->saddr) |
((__force u32)iph->daddr ^ (__force u32)iph2->daddr)) {
NAPI_GRO_CB(p)->same_flow = 0;
continue;
}
/* All fields must match except length and checksum. */
NAPI_GRO_CB(p)->flush |=
(iph->ttl ^ iph2->ttl) | //ttl不一样
(iph->tos ^ iph2->tos) | //tos不一样
((iph->frag_off ^ iph2->frag_off) & htons(IP_DF)); //DF标记不一样,不能GRO
NAPI_GRO_CB(p)->flush |= flush;
/* We need to store of the IP ID check to be included later
* when we can verify that this packet does in fact belong
* to a given flow.
*/
flush_id = (u16)(id - ntohs(iph2->id)); //不带DF标记的包ip id必须每个包增1
/* This bit of code makes it much easier for us to identify
* the cases where we are doing atomic vs non-atomic IP ID
* checks. Specifically an atomic check can return IP ID
* values 0 - 0xFFFF, while a non-atomic check can only
* return 0 or 0xFFFF.
*/
if (!NAPI_GRO_CB(p)->is_atomic || //需要检查flush_id
!(iph->frag_off & htons(IP_DF))) { //skb不包含DF
flush_id ^= NAPI_GRO_CB(p)->count; //不包含DF,则ip id要每个包加1
flush_id = flush_id ? 0xFFFF : 0; //不是每个包加1,则设置0xFFFF, 每个包加1则flush_id=0
}
/* If the previous IP ID value was based on an atomic
* datagram we can overwrite the value and ignore it.
*/
if (NAPI_GRO_CB(skb)->is_atomic) //在dev_gro_receive初始化为1
NAPI_GRO_CB(p)->flush_id = flush_id; //对于包含DF标记的包,flush_id>=1; 对于不包含DF标记且ip id每个包增1,flush_id=0; 否则flush_id=0xFFFF
else
NAPI_GRO_CB(p)->flush_id |= flush_id;
}
NAPI_GRO_CB(skb)->is_atomic = !!(iph->frag_off & htons(IP_DF)); //包含DF标记,则是原子的,不需要查看flush_id; 不包含DF的每个包ip id必须加1
NAPI_GRO_CB(skb)->flush |= flush;
skb_set_network_header(skb, off);
/* The above will be needed by the transport layer if there is one
* immediately following this IP hdr.
*/
/* Note : No need to call skb_gro_postpull_rcsum() here,
* as we already checked checksum over ipv4 header was 0
*/
skb_gro_pull(skb, sizeof(*iph));
skb_set_transport_header(skb, skb_gro_offset(skb));
pp = call_gro_receive(ops->callbacks.gro_receive, head, skb); //L4层GRO, tcp4_gro_receive
out_unlock:
rcu_read_unlock();
out:
NAPI_GRO_CB(skb)->flush |= flush;
return pp;
}

tcp4_gro_receive

tcp层主要判断来源端口是否一样,tcp扩展项是否完全一致, 是否是控制报文必须,控制报文必须快速发送给协议栈。
并且乱序报文也必须马上提交。
可以的话则调用skb_gro_receive尝试合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
struct sk_buff **tcp_gro_receive(struct sk_buff **head, struct sk_buff *skb)
{
struct sk_buff **pp = NULL;
struct sk_buff *p;
struct tcphdr *th;
struct tcphdr *th2;
unsigned int len;
unsigned int thlen;
__be32 flags;
unsigned int mss = 1;
unsigned int hlen;
unsigned int off;
int flush = 1;
int i;
off = skb_gro_offset(skb);
hlen = off + sizeof(*th);
th = skb_gro_header_fast(skb, off);
if (skb_gro_header_hard(skb, hlen)) { //包头在head区域
th = skb_gro_header_slow(skb, hlen, off);
if (unlikely(!th))
goto out;
}
thlen = th->doff * 4;
if (thlen < sizeof(*th))
goto out;
hlen = off + thlen;
if (skb_gro_header_hard(skb, hlen)) { //整个tcp头
th = skb_gro_header_slow(skb, hlen, off);
if (unlikely(!th))
goto out;
}
skb_gro_pull(skb, thlen);
len = skb_gro_len(skb);
flags = tcp_flag_word(th);
for (; (p = *head); head = &p->next) {
if (!NAPI_GRO_CB(p)->same_flow) //只合并same_flow
continue;
th2 = tcp_hdr(p);
if (*(u32 *)&th->source ^ *(u32 *)&th2->source) { //tcp头的源端口不一样
NAPI_GRO_CB(p)->same_flow = 0; //不是同一个流
continue;
}
goto found;
}
goto out_check_final;
found: //找到same flow的包
/* Include the IP ID check below from the inner most IP hdr */
flush = NAPI_GRO_CB(p)->flush;
flush |= (__force int)(flags & TCP_FLAG_CWR); //拥塞标记,需要flush
flush |= (__force int)((flags ^ tcp_flag_word(th2)) & //两个包flag不一样
~(TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH)); //不一样标记不是这3个
flush |= (__force int)(th->ack_seq ^ th2->ack_seq); //ack的序号不一样
for (i = sizeof(*th); i < thlen; i += 4) //对比两个包的tcp扩展选项
flush |= *(u32 *)((u8 *)th + i) ^
*(u32 *)((u8 *)th2 + i);
/* When we receive our second frame we can made a decision on if we
* continue this flow as an atomic flow with a fixed ID or if we use
* an incrementing ID.
*/
if (NAPI_GRO_CB(p)->flush_id != 1 ||
NAPI_GRO_CB(p)->count != 1 ||
!NAPI_GRO_CB(p)->is_atomic)
flush |= NAPI_GRO_CB(p)->flush_id;
else
NAPI_GRO_CB(p)->is_atomic = false; //flush_id=1 && count=1 && is_atomic=1; 第一个包带DF,这个第二个包id增量为1,本次不需要flush,但是下一次要检查,设置非原子
mss = skb_shinfo(p)->gso_size;
flush |= (len - 1) >= mss; //超过gso大小,可以上交协议栈了
flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq); //如果skb不是p的下一个包。 比如tcp out-of-order的包马上提交给协议栈
if (flush || skb_gro_receive(head, skb)) { //设置了flush或者gro失败
mss = 1; //flush说明不使用gro, mss设置为1,是为了让后面flush清零
goto out_check_final;
}
//gro成功
p = *head; //head指向gro流
th2 = tcp_hdr(p);
tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH); //如果skb包含fin或者psh,则合并成功后也要更新
out_check_final:
flush = len < mss; //对于绕过gro(mss=1)的情况,flush清零;否则对gro成功后是否达到应该提交协议栈的大小做判断
flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH | //包含这些tcp标记就要flush到协议栈
TCP_FLAG_RST | TCP_FLAG_SYN |
TCP_FLAG_FIN));
if (p && (!NAPI_GRO_CB(skb)->same_flow || flush)) //找到了same flow,但是需要flush
pp = head; //更新返回值为需要flush的流
out:
NAPI_GRO_CB(skb)->flush |= (flush != 0); //如果flush,则上层提交给协议栈
return pp;
}

skb_gro_receive

skb_gro_receive执行实际的合并操作,能一起放到frag_list中,则合并到frag_list;
否则放到NAPI_GRO_CB(p)->last链表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
int skb_gro_receive(struct sk_buff **head, struct sk_buff *skb)
{
struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb);
unsigned int offset = skb_gro_offset(skb); //数据部分的开始偏移
unsigned int headlen = skb_headlen(skb); //head区域中数据的长度
unsigned int len = skb_gro_len(skb); //skb数据部分长度
struct sk_buff *lp, *p = *head;
unsigned int delta_truesize;
if (unlikely(p->len + len >= 65536))
return -E2BIG;
lp = NAPI_GRO_CB(p)->last;
pinfo = skb_shinfo(lp);
if (headlen <= offset) { //数据部分不在head区,可以把skb放到p的frag_list中
skb_frag_t *frag;
skb_frag_t *frag2;
int i = skbinfo->nr_frags;
int nr_frags = pinfo->nr_frags + i;
if (nr_frags > MAX_SKB_FRAGS)
goto merge;
offset -= headlen;
pinfo->nr_frags = nr_frags;
skbinfo->nr_frags = 0;
frag = pinfo->frags + nr_frags;
frag2 = skbinfo->frags + i;
do {
*--frag = *--frag2;
} while (--i); //把skb的frag移到p后面
frag->page_offset += offset; //去掉前面的包头部分
skb_frag_size_sub(frag, offset);
/* all fragments truesize : remove (head size + sk_buff) */
delta_truesize = skb->truesize -
SKB_TRUESIZE(skb_end_offset(skb));
skb->truesize -= skb->data_len; //减去frag部分数据,已经合并到p中
skb->len -= skb->data_len;
skb->data_len = 0;
NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE; //标记合并过了,可以被是否
goto done;
} else if (skb->head_frag) { //有部分数据在head区域,且支持sg
int nr_frags = pinfo->nr_frags;
skb_frag_t *frag = pinfo->frags + nr_frags;
struct page *page = virt_to_head_page(skb->head);
unsigned int first_size = headlen - offset; //head区域中数据部分的长度
unsigned int first_offset;
if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS)
goto merge;
first_offset = skb->data -
(unsigned char *)page_address(page) +
offset;
pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags;
frag->page.p = page; //把head区域的页面移动到p中
frag->page_offset = first_offset;
skb_frag_size_set(frag, first_size);
memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags); //再把skb中frag_list部分移动到p
/* We dont need to clear skbinfo->nr_frags here */
delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff));
NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD; //标记head区域页面的移动
goto done;
}
//有数据在head区域,且不支持sg; 或是p的frag_list不够了
merge:
delta_truesize = skb->truesize;
if (offset > headlen) { //数据不在head区,但是p的frag_list不够
unsigned int eat = offset - headlen;
skbinfo->frags[0].page_offset += eat; //减去非数据部分
skb_frag_size_sub(&skbinfo->frags[0], eat);
skb->data_len -= eat;
skb->len -= eat;
offset = headlen; //这时候frag中只包含数据
}
__skb_pull(skb, offset); //data指向数据部分
if (NAPI_GRO_CB(p)->last == p) //只有p一个包
skb_shinfo(p)->frag_list = skb; //把skb挂到p的frag_list上
else
NAPI_GRO_CB(p)->last->next = skb; //skb放到p的链表尾部
NAPI_GRO_CB(p)->last = skb; //修正last
__skb_header_release(skb);
lp = p;
done:
NAPI_GRO_CB(p)->count++;
p->data_len += len;
p->truesize += delta_truesize;
p->len += len;
if (lp != p) {
lp->data_len += len;
lp->truesize += delta_truesize;
lp->len += len;
}
NAPI_GRO_CB(skb)->same_flow = 1; //gro完毕,设置skb的same_slow
return 0;
}