linux 4.9内核rcu实现

预备资料

linux/Documentation/RCU/Design/Data-Structures/Data-Structures.html
http://lwn.net/Articles/652156/

基本术语

quiescent states(qs): 不在reader关键区的状态
grace period(gp): writer更新的时间区域,所有cpu都间隔一个qs状态后表示一个gp完成
extended quiescent state(eqs): 比如cpu在dyntick idle的时候,这时候时钟周期被禁用导致当前cpu上不会有qs生成
expedited grace period(egp): 加速gp的完成,但是用cpu的额外开销换取gp的完成
flavor: 主要的两个是rcu_sched_state和rcu_bh_state, 如果调用在softirq上下文中rcu_read_lock_bh/rcu_read_unlock_bh,则完成一次softirq handler则认为是一次qs,
可以避免gp时间过长和callback堆积的问题,每个flavor都有自己的树形结构
nocbs: callback offload, 在特定rcuo kthread上处理rcu callback

RCU的概念特性决定了需要保证reader基本没有额外开销, writer可以稍微延时被完成。 因此可以看到RCU_SOFTIRQ在softirq里面是最低优先级
另外本文不会涉及CONFIG_PREEMPT=y和CONFIG_PREEMPT_RCU=y的情况

no_hz and rcu

no_hz

从3.10, CONFIG_NO_HZ选项有三个选择
CONFIG_HZ_PERIODIC is the old-style mode wherein the timer tick runs at all times.
CONFIG_NO_HZ_IDLE (the default setting) will cause the tick to be disabled at idle, the way setting CONFIG_NO_HZ did in earlier kernels.
CONFIG_NO_HZ_FULL will enable the “full” tickless mode.
CONFIG_NO_HZ_FULL_SYSIDLE

tick interrupt是percpu中断, 从2.6.21开始, CONFIG_NO_HZ(3.10中的CONFIG_NO_HZ_IDLE)引入了dynamic ticks的概念来消除cpu idle时的中断
CONFIG_NO_HZ_FULL则在CONFIG_NO_HZ_IDLE的基础上,消除了cpu上只有单个进程时的时钟中断。 目前为默认配置选项。

boot参数nohz_full也可以自己指定具体adaptive-ticks CPUs,但是需要至少留一个cpu给timekeeping,使gettimeofday()返回精确的时间
因为CONFIG_NO_HZ_FULL至少要有一个timekeeping cpu,即使所有cpu都是idle状态,因此引入CONFIG_NO_HZ_FULL_SYSIDLE选项,当所有cpu idle的时候,也能关闭timekeeping cpu的定时中断

(Nearly) full tickless operation in 3.10
linux/Documentation/timers/NO_HZ.txt
nohz_full=godmode ?

rcu and no_hz

当开启no_hz的时候,可能当前cpu上有等待的callback,这时候关闭tick中断,就会延长gp时间

CONFIG_RCU_FAST_NO_HZ=y选项还是会进入adaptive-tick mode,但是会每4个jiffies就会唤醒,来保证当前gp能够及时被处理

CONFIG_RCU_NOCB_CPU=y选项则提供了rcu callback offload, 使rcu callback跑到rcuo kthread上
CONFIG_RCU_NOCB_CPU_NONE=y选项指定没有cpu被 offload
CONFIG_RCU_NOCB_CPU_ZERO=y选项指定cpu0被offload
CONFIG_RCU_NOCB_CPU_ALL=y选项指定所有cpu都可以被offload, 默认选项
rcu_nocbs=1,3-5 启动参数在以上参数的基础上增加被offload的cpu,CONFIG_RCU_NOCB_CPU_ALL=y的情况下,rcu_nocbs没有作用

被offload的cpu可以不受rcu影响进入adaptive-tick mode.
rcuo kthread运行的cpu可以被userspace指定,或者由调度器来决定(因此不一定是你想要运行的cpu)

rcu_read_lock/rcu_read_unlock

CONFIG_PREEMPT_RCU(发行版一般不开启)选项开启read关键区的抢占,可抢占rcu的实现主要是在递增current->rcu_read_lock_nesting
非抢占rcu的rcu_read_lock/rcu_read_unlock依赖CONFIG_PREEMPT_COUNT选项,如果开启,就是开关中断, 否则就空代码
可以知道reader关键区是允许被中断的,即使中断中有writer call_rcu,也能保证reader访问完整的旧数据,之后再调用callback释放旧指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static inline void rcu_read_lock(void)
{
__rcu_read_lock();
}
static inline void rcu_read_unlock(void)
{
__rcu_read_unlock();
}
#ifdef CONFIG_PREEMPT_RCU
void __rcu_read_lock(void)
{
current->rcu_read_lock_nesting++;
barrier(); /* critical section after entry code. */
}
void __rcu_read_unlock(void)
{
struct task_struct *t = current;
if (t->rcu_read_lock_nesting != 1) {
--t->rcu_read_lock_nesting;
} else {
barrier(); /* critical section before exit code. */
t->rcu_read_lock_nesting = INT_MIN;
barrier(); /* assign before ->rcu_read_unlock_special load */
if (unlikely(READ_ONCE(t->rcu_read_unlock_special.s)))
rcu_read_unlock_special(t);
barrier(); /* ->rcu_read_unlock_special load before assign */
t->rcu_read_lock_nesting = 0;
}
}
#else
static inline void __rcu_read_lock(void)
{
if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
preempt_disable();
}
static inline void __rcu_read_unlock(void)
{
if (IS_ENABLED(CONFIG_PREEMPT_COUNT))
preempt_enable();
}
#endif

synchronize_rcu/call_rcu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
void synchronize_sched(void)
{
if (rcu_blocking_is_gp())
return;
if (rcu_gp_is_expedited()) //开启rcu_expedited选项
synchronize_sched_expedited();
else
wait_rcu_gp(call_rcu_sched); //
}
#define _wait_rcu_gp(checktiny, ...) \
do { \
call_rcu_func_t __crcu_array[] = { __VA_ARGS__ }; \
struct rcu_synchronize __rs_array[ARRAY_SIZE(__crcu_array)]; \
__wait_rcu_gp(checktiny, ARRAY_SIZE(__crcu_array), \
__crcu_array, __rs_array); \
} while (0)
#define wait_rcu_gp(...) _wait_rcu_gp(false, __VA_ARGS__)
void __wait_rcu_gp(bool checktiny, int n, call_rcu_func_t *crcu_array,
struct rcu_synchronize *rs_array)
{
int i;
/* Initialize and register callbacks for each flavor specified. */
for (i = 0; i < n; i++) {
if (checktiny &&
(crcu_array[i] == call_rcu ||
crcu_array[i] == call_rcu_bh)) {
might_sleep();
continue;
}
init_rcu_head_on_stack(&rs_array[i].head);
init_completion(&rs_array[i].completion);
(crcu_array[i])(&rs_array[i].head, wakeme_after_rcu); //调用call_rcu_sched
}
/* Wait for all callbacks to be invoked. */
for (i = 0; i < n; i++) {
if (checktiny &&
(crcu_array[i] == call_rcu ||
crcu_array[i] == call_rcu_bh))
continue;
wait_for_completion(&rs_array[i].completion); //阻塞等待
destroy_rcu_head_on_stack(&rs_array[i].head);
}
}
void call_rcu_sched(struct rcu_head *head, rcu_callback_t func)
{
__call_rcu(head, func, &rcu_sched_state, -1, 0); //func为回掉,对同步等待synchronize_rcu来说就是调用wakeme_after_rcu唤醒
}
static void
__call_rcu(struct rcu_head *head, rcu_callback_t func,
struct rcu_state *rsp, int cpu, bool lazy)
{
unsigned long flags;
struct rcu_data *rdp;
head->func = func;
head->next = NULL;
local_irq_save(flags); //percpu变量修改关中断
rdp = this_cpu_ptr(rsp->rda); //this cpu rcu_data
/* Add the callback to our list. */
//rdp->nxttail[RCU_NEXT_TAIL] == NULL说明当前cpu是nocbs cpu,或者是未初始化
if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
int offline;
if (cpu != -1)
rdp = per_cpu_ptr(rsp->rda, cpu); //指定cpu上的rcu_data
if (likely(rdp->mynode)) {
/* Post-boot, so this should be for a no-CBs CPU. */
// nocbs cpu把callback存放到rdp->nocb_head队列中
offline = !__call_rcu_nocb(rdp, head, lazy, flags);
/* Offline CPU, _call_rcu() illegal, leak callback. */
local_irq_restore(flags);
return;
}
//call_rcu在rcu_init之前被调用
/*
* Very early boot, before rcu_init(). Initialize if needed
* and then drop through to queue the callback.
*/
if (!likely(rdp->nxtlist))
init_default_callback_list(rdp);
}
WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
if (lazy)
rdp->qlen_lazy++;
else
rcu_idle_count_callbacks_posted();
smp_mb(); /* Count before adding callback for rcu_barrier(). */
*rdp->nxttail[RCU_NEXT_TAIL] = head;
rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
/* Go handle any RCU core processing required. */
__call_rcu_core(rsp, rdp, head, flags);
local_irq_restore(flags);
}
/*
* Handle any core-RCU processing required by a call_rcu() invocation.
*/
static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
struct rcu_head *head, unsigned long flags)
{
bool needwake;
/*
* If called from an extended quiescent state, invoke the RCU
* core in order to force a re-evaluation of RCU's idleness.
*/
if (!rcu_is_watching()) //当前cpu idle,在软中断处理重新
invoke_rcu_core(); //RCU_SOFTIRQ->rcu_process_callbacks
/* If interrupts were disabled or CPU offline, don't invoke RCU core. */
if (irqs_disabled_flags(flags) || cpu_is_offline(smp_processor_id()))
return;
/*
* Force the grace period if too many callbacks or too long waiting.
* Enforce hysteresis, and don't invoke force_quiescent_state()
* if some other CPU has recently done so. Also, don't bother
* invoking force_quiescent_state() if the newly enqueued callback
* is the only one waiting for a grace period to complete.
*/
if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) { //比上次检查多了qhimark个pending
/* Are we ignoring a completed grace period? */
note_gp_changes(rsp, rdp);
/* Start a new grace period if one not already started. */
if (!rcu_gp_in_progress(rsp)) {
struct rcu_node *rnp_root = rcu_get_root(rsp);
raw_spin_lock_rcu_node(rnp_root);
needwake = rcu_start_gp(rsp);
raw_spin_unlock_rcu_node(rnp_root);
if (needwake)
rcu_gp_kthread_wake(rsp); //唤醒gp kthread
} else { //很多pending了,并且当前有gp在处理
/* Give the grace period a kick. */
rdp->blimit = LONG_MAX;
if (rsp->n_force_qs == rdp->n_force_qs_snap &&
*rdp->nxttail[RCU_DONE_TAIL] != head)
force_quiescent_state(rsp); //标记fqs
rdp->n_force_qs_snap = rsp->n_force_qs;
rdp->qlen_last_fqs_check = rdp->qlen;
}
}
}

softirq->rcu_process_callbacks

invoke_rcu_core() raise了一个RCU_SOFTIRQ, 在softirq中调用rcu_process_callbacks()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void
__rcu_process_callbacks(struct rcu_state *rsp)
{
unsigned long flags;
bool needwake;
struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
/* Update RCU state based on any recent quiescent states. */
rcu_check_quiescent_state(rsp, rdp); //检查和上报新的gp
/* Does this CPU require a not-yet-started grace period? */
local_irq_save(flags);
if (cpu_needs_another_gp(rsp, rdp)) { //还需要等待下一个gp
raw_spin_lock_rcu_node(rcu_get_root(rsp)); /* irqs disabled. */
needwake = rcu_start_gp(rsp); //调整callback队列,如果需要注册新gp的need_future_gp,并返回是否需要初始化新的gp并唤醒gp kthread
raw_spin_unlock_irqrestore_rcu_node(rcu_get_root(rsp), flags);
if (needwake)
rcu_gp_kthread_wake(rsp);
} else {
local_irq_restore(flags);
}
/* If there are callbacks ready, invoke them. */
if (cpu_has_callbacks_ready_to_invoke(rdp)) //有callback需要马上调用,并且没有开启nocbs
invoke_rcu_callbacks(rsp, rdp); //调用rcu_do_batch()来调用callback
/* Do any needed deferred wakeups of rcuo kthreads. */
do_nocb_deferred_wakeup(rdp); //如果在之前nocbs入队的时候是关中断的,需要快速执行,就延迟到软中断来处理唤醒
}
static void rcu_process_callbacks(struct softirq_action *unused)
{
struct rcu_state *rsp;
if (cpu_is_offline(smp_processor_id()))
return;
for_each_rcu_flavor(rsp)
__rcu_process_callbacks(rsp);
}

qs report

qs状态是从percpu的rcu_data结构,往上传到树的根节点
gp状态则是相反,根节点中存在着最新的状态

rcu_sched_qs和rcu_bh_qs分别对不同的flavor进行上报,他们检测到qs的时机点也不相同

rcu_sched_qs

1.rcu_note_context_switch(在__schedule()中调用)
2.rcu_check_callbacks(在时钟中断处理函数中的update_process_times中被调用),如果是用户进程,或者是idle loop,则说明是经过了一个qs
3.rcu_all_qs(即rcu_note_voluntary_context_switch,在rcu_check_callbacks和cond_resched_rcu_qs中被调用)

在时钟中断处理函数中调用了update_process_times更新当前进程与时钟相关的信息,其中又调用了rcu_check_callbacks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
* Check to see if this CPU is in a non-context-switch quiescent state
* (user mode or idle loop for rcu, non-softirq execution for rcu_bh).
* Also schedule RCU core processing.
*
* This function must be called from hardirq context. It is normally
* invoked from the scheduling-clock interrupt. If rcu_pending returns
* false, there is no point in invoking rcu_check_callbacks().
*/
void rcu_check_callbacks(int user)
{
increment_cpu_stall_ticks();
if (user || rcu_is_cpu_rrupt_from_idle()) { //用户进程,或者idle loop,或者非嵌套中断中,说明不是中断在read关键区,可以尝试上报qs完成
/*
* Get here if this CPU took its interrupt from user
* mode or from the idle loop, and if this is not a
* nested interrupt. In this case, the CPU is in
* a quiescent state, so note it.
*
* No memory barrier is required here because both
* rcu_sched_qs() and rcu_bh_qs() reference only CPU-local
* variables that other CPUs neither access nor modify,
* at least not while the corresponding CPU is online.
*/
rcu_sched_qs();
rcu_bh_qs();
} else if (!in_softirq()) { //非软中断中,因此可以尝试上报软中断中的read关键区的qs
/*
* Get here if this CPU did not take its interrupt from
* softirq, in other words, if it is not interrupting
* a rcu_bh read-side critical section. This is an _bh
* critical section, so note it.
*/
rcu_bh_qs();
}
rcu_preempt_check_callbacks();
if (rcu_pending())
invoke_rcu_core(); //当前cpu有rcu相关的工作要做,比如说有qs完成,或者有callback可以马上被调用等
if (user)
rcu_note_voluntary_context_switch(current);
}
#define rcu_note_voluntary_context_switch(t) rcu_all_qs()
void rcu_all_qs(void)
{
unsigned long flags;
barrier(); /* Avoid RCU read-side critical sections leaking down. */
if (unlikely(raw_cpu_read(rcu_sched_qs_mask))) {
local_irq_save(flags);
rcu_momentary_dyntick_idle(); //dynticks+2
local_irq_restore(flags);
}
if (unlikely(raw_cpu_read(rcu_sched_data.cpu_no_qs.b.exp))) { //如果是加速qs, 通过ipi当前cpu被调度
/*
* Yes, we just checked a per-CPU variable with preemption
* enabled, so we might be migrated to some other CPU at
* this point. That is OK because in that case, the
* migration will supply the needed quiescent state.
* We might end up needlessly disabling preemption and
* invoking rcu_sched_qs() on the destination CPU, but
* the probability and cost are both quite low, so this
* should not be a problem in practice.
*/
preempt_disable();
rcu_sched_qs();
preempt_enable();
}
this_cpu_inc(rcu_qs_ctr); //每次rcu_all_qs调用加1,rdp->rcu_qs_ctr_snap记录了gp开始时的快照, 因为只会在user进程被时钟中断和cond_resched_rcu_qs让出失败时被调用,都可以视为一个qs
barrier(); /* Avoid RCU read-side critical sections leaking up. */
}
void rcu_sched_qs(void)
{
if (!__this_cpu_read(rcu_sched_data.cpu_no_qs.s)) //当前cpu已经有qs
return;
__this_cpu_write(rcu_sched_data.cpu_no_qs.b.norm, false); //标记qs
if (!__this_cpu_read(rcu_sched_data.cpu_no_qs.b.exp)) //expedited qs没有标记,则直接返回
return;
__this_cpu_write(rcu_sched_data.cpu_no_qs.b.exp, false); //expedited qs处理
rcu_report_exp_rdp(&rcu_sched_state,
this_cpu_ptr(&rcu_sched_data), true);
}

rcu_check_quiescent_state

在每次rcu softirq处理中调用rcu_check_quiescent_state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
* Check to see if there is a new grace period of which this CPU
* is not yet aware, and if so, set up local rcu_data state for it.
* Otherwise, see if this CPU has just passed through its first
* quiescent state for this grace period, and record that fact if so.
*/
static void
rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
{
/* Check for grace-period ends and beginnings. */
//检查和处理新的gp
note_gp_changes(rsp, rdp);
/*
* Does this CPU still need to do its part for current grace period?
* If no, return and let the other CPUs do their part as well.
*/
//这个rdp的qs已经上报了
if (!rdp->core_needs_qs)
return;
/*
* Was there a quiescent state since the beginning of the grace
* period? If no, then exit and wait for the next call.
*/
if (rdp->cpu_no_qs.b.norm &&
rdp->rcu_qs_ctr_snap == __this_cpu_read(rcu_qs_ctr)) //当前cpu还要等qs,并且没有被rcu_all_qs调度过
return;
/*
* Tell RCU we are done (but rcu_report_qs_rdp() will be the
* judge of that).
*/
rcu_report_qs_rdp(rdp->cpu, rsp, rdp); //说明可能有qs,上报
}
/*
* Record a quiescent state for the specified CPU to that CPU's rcu_data
* structure. This must be called from the specified CPU.
*/
static void
rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
{
unsigned long flags;
unsigned long mask;
bool needwake;
struct rcu_node *rnp;
rnp = rdp->mynode;
raw_spin_lock_irqsave_rcu_node(rnp, flags);
if ((rdp->cpu_no_qs.b.norm &&
rdp->rcu_qs_ctr_snap == __this_cpu_read(rcu_qs_ctr)) || //no qs并且没有被调度
rdp->gpnum != rnp->gpnum || rnp->completed == rnp->gpnum || //说明已经发现qs,但是gp还没到,或是已经结束
rdp->gpwrap) {
//gp已经结束了,不用上报当前qs
rdp->cpu_no_qs.b.norm = true; /* need qs for new gp. */
rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_qs_ctr);
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
return;
}
mask = rdp->grpmask;
if ((rnp->qsmask & mask) == 0) { //已经上报过
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
} else {
rdp->core_needs_qs = false;
/*
* This GP can't end until cpu checks in, so all of our
* callbacks can be processed during the next GP.
*/
needwake = rcu_accelerate_cbs(rsp, rnp, rdp);
rcu_report_qs_rnp(mask, rsp, rnp, rnp->gpnum, flags); //上报到mynode
/* ^^^ Released rnp->lock */
if (needwake)
rcu_gp_kthread_wake(rsp);
}
//rcu_report_qs_rnp最后如果一直上报到根节点,表示所有qs已经完成, 调用rcu_report_qs_rsp通知rsp, 设置RCU_GP_FLAG_FQS并唤醒gp kthread
static void rcu_report_qs_rsp(struct rcu_state *rsp, unsigned long flags)
__releases(rcu_get_root(rsp)->lock)
{
WARN_ON_ONCE(!rcu_gp_in_progress(rsp));
WRITE_ONCE(rsp->gp_flags, READ_ONCE(rsp->gp_flags) | RCU_GP_FLAG_FQS);
raw_spin_unlock_irqrestore_rcu_node(rcu_get_root(rsp), flags);
swake_up(&rsp->gp_wq); /* Memory barrier implied by swake_up() path. */
}

gp处理

在softirq __rcu_process_callbacks中,如果需要创建一个新的gp则调用rcu_start_gp

cpu_needs_another_gp-初始化新gp的条件

nocbs方式下gp的初始化也有可能是在rcu_nocb_kthread的nocb_leader_wait中调用rcu_nocb_wait_gp,然后rcu_start_future_gp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/*
* Does the current CPU require a not-yet-started grace period?
* The caller must have disabled interrupts to prevent races with
* normal callback registry.
*/
static bool
cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
{
int i;
if (rcu_gp_in_progress(rsp)) //还有gp在处理
return false; /* No, a grace period is already in progress. */
if (rcu_future_needs_gp(rsp)) //没有gp在处理,但是有下一个gp任务在等待, 说明需要创建新的gp
return true; /* Yes, a no-CBs CPU needs one. */
if (!rdp->nxttail[RCU_NEXT_TAIL]) //没有下一个gp在等待,且no-cbs的cpu不能处理
return false; /* No, this is a no-CBs (or offline) CPU. */
if (*rdp->nxttail[RCU_NEXT_READY_TAIL]) //没有gp在处理,也没有标记等待的下一个gp,但是下一个gp的callback list中有
return true; /* Yes, CPU has newly registered callbacks. */
for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
if (rdp->nxttail[i - 1] != rdp->nxttail[i] && //非空
ULONG_CMP_LT(READ_ONCE(rsp->completed), //说明有更大的gp number在等待
rdp->nxtcompleted[i]))
return true; /* Yes, CBs for future grace period. */
return false; /* No grace period needed. */
}

rcu_start_gp

1
2
3
4
5
6
7
8
9
10
11
static bool rcu_start_gp(struct rcu_state *rsp)
{
struct rcu_data *rdp = this_cpu_ptr(rsp->rda);
struct rcu_node *rnp = rcu_get_root(rsp);
bool ret = false;
//先advanced callback,因为如果当前没有gp在处理的话,下一个gp到来就能满足当前callback,并且在调用cpu_needs_another_gp()前更新最新信息
ret = rcu_advance_cbs(rsp, rnp, rdp) || ret;
ret = rcu_start_gp_advanced(rsp, rnp, rdp) || ret;
return ret;
}

rcu_advance_cbs

rcu_advance_cbs对比node和data中的completed,是否有新的gp完成, 如果有则移动对应callback到RCU_DONE_TAIL,并更新其他相应的list
最后调用rcu_accelerate_cbs,为在RCU_NEXT_TAIL中未分配gp number的callback分配gp number

rcu_advance_cbs主要在rcu_start_gp, rcu_gp_cleanup(在gp kthread中gp完成后调用),note_gp_change 这三个函数中被调用
只有在
note_gp_change中,rnp可能不是root rcu_node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
struct rcu_data *rdp)
{
int i, j;
/* If the CPU has no callbacks, nothing to do. */
if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
return false;
/*
* Find all callbacks whose ->completed numbers indicate that they
* are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
*/
for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i])) //gp从root向下传播,如果node中completed比较大,则表示有gp到期,把callback移动到RCU_DONE_TAIL
break;
rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
}
/* Clean up any sublist tail pointers that were misordered above. */
for (j = RCU_WAIT_TAIL; j < i; j++)
rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL]; //把相关指针指向done,置为空
/* Copy down callbacks to fill in empty sublists. */ //比如说RCU_NEXT_READY_TAIL中有,RCU_WAIT_TAIL中没有的情况
for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) { //把后面的sublist往前移一步,消除漏洞,可以避免rcu_accelerate_cbs分配新的gp number需要的调用次数,因为RCU_NEXT_READY_TAIL队列非空
if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
break; //没有新的callback了
rdp->nxttail[j] = rdp->nxttail[i];
rdp->nxtcompleted[j] = rdp->nxtcompleted[i];
}
/* Classify any remaining callbacks. */
return rcu_accelerate_cbs(rsp, rnp, rdp);
}

rcu_accelerate_cbs

调用rcu_cbs_completed获取新gp number,并把新callback移动到对应的sublist中, 并更新对应的nxtcompleted[]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static unsigned long rcu_cbs_completed(struct rcu_state *rsp,
struct rcu_node *rnp)
{
/*
* If RCU is idle, we just wait for the next grace period.
* But we can only be sure that RCU is idle if we are looking
* at the root rcu_node structure -- otherwise, a new grace
* period might have started, but just not yet gotten around
* to initializing the current non-root rcu_node structure.
*/
if (rcu_get_root(rsp) == rnp && rnp->gpnum == rnp->completed) //root中的gp是最新信息,如果当前rcu明确是idle的,则新callback将在下一个gp被调用
return rnp->completed + 1;
/*
* Otherwise, wait for a possible partial grace period and
* then the subsequent full grace period.
*/
return rnp->completed + 2; //这里说明有可能有gp正在处理,加上新的gp号,就需要+2
}
static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
struct rcu_data *rdp)
{
unsigned long c;
int i;
bool ret;
/* If the CPU has no callbacks, nothing to do. */
if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
return false;
/*
* Starting from the sublist containing the callbacks most
* recently assigned a ->completed number and working down, find the
* first sublist that is not assignable to an upcoming grace period.
* Such a sublist has something in it (first two tests) and has
* a ->completed number assigned that will complete sooner than
* the ->completed number for newly arrived callbacks (last test).
*
* The key point is that any later sublist can be assigned the
* same ->completed number as the newly arrived callbacks, which
* means that the callbacks in any of these later sublist can be
* grouped into a single sublist, whether or not they have already
* been assigned a ->completed number.
*/
c = rcu_cbs_completed(rsp, rnp); //一个full gp间隔需要的下一个number
for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--) //RCU_NEXT_TAIL中cb未分配gp number, RCU_DONE_TAIL中的也没有意义
if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
!ULONG_CMP_GE(rdp->nxtcompleted[i], c)) //找到一个非空sublist,并且nxtcompleted要比c小
break;
/*
* If there are no sublist for unassigned callbacks, leave.
* At the same time, advance "i" one sublist, so that "i" will
* index into the sublist where all the remaining callbacks should
* be grouped into.
*/
if (++i >= RCU_NEXT_TAIL) // i+1的sublist就能存放c,如果等于RCU_NEXT_TAIL,这时候就是新callback的默认行为
return false;
/*
* Assign all subsequent callbacks' ->completed number to the next
* full grace period and group them all in the sublist initially
* indexed by "i".
*/
for (; i <= RCU_NEXT_TAIL; i++) { //把新callback从RCU_NEXT_TAIL移动到i
rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
rdp->nxtcompleted[i] = c;
}
/* Record any needed additional grace periods. */
ret = rcu_start_future_gp(rnp, rdp, NULL);
return ret;
}

rcu_start_future_gp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/*
* Start some future grace period, as needed to handle newly arrived
* callbacks. The required future grace periods are recorded in each
* rcu_node structure's ->need_future_gp field. Returns true if there
* is reason to awaken the grace-period kthread.
*
* The caller must hold the specified rcu_node structure's ->lock.
*/
static bool __maybe_unused
rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
unsigned long *c_out)
{
unsigned long c;
int i;
bool ret = false;
struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
/*
* Pick up grace-period number for new callbacks. If this
* grace period is already marked as needed, return to the caller.
*/
c = rcu_cbs_completed(rdp->rsp, rnp);
if (rnp->need_future_gp[c & 0x1]) { //已经标记过
goto out;
}
if (rnp->gpnum != rnp->completed || //gp正在运行,或者还没被更新
READ_ONCE(rnp_root->gpnum) != READ_ONCE(rnp_root->completed)) { //已经获得rnp lock,可以同步访问最新值,防止新的gp完成
rnp->need_future_gp[c & 0x1]++; //标记需要的gp
goto out;
}
//更新root的need_future_gp信息,获取root锁
/*
* There might be no grace period in progress. If we don't already
* hold it, acquire the root rcu_node structure's lock in order to
* start one (if needed).
*/
if (rnp != rnp_root)
raw_spin_lock_rcu_node(rnp_root); //rnp的锁已经在外层获取
/*
* Get a new grace-period number. If there really is no grace
* period in progress, it will be smaller than the one we obtained
* earlier. Adjust callbacks as needed. Note that even no-CBs
* CPUs have a ->nxtcompleted[] array, so no no-CBs checks needed.
*/
c = rcu_cbs_completed(rdp->rsp, rnp_root); //重新获取gp number,这次获取的是root的最新gp状态,c可能比第一次获取的小
for (i = RCU_DONE_TAIL; i < RCU_NEXT_TAIL; i++)
if (ULONG_CMP_LT(c, rdp->nxtcompleted[i])) //新的c较小,调整
rdp->nxtcompleted[i] = c;
/*
* If the needed for the required grace period is already
* recorded, trace and leave.
*/
if (rnp_root->need_future_gp[c & 0x1]) { //已经记录了该gp number
goto unlock_out;
}
/* Record the need for the future grace period. */
rnp_root->need_future_gp[c & 0x1]++; //更新root中的future gp request
/* If a grace period is not already in progress, start one. */
if (rnp_root->gpnum != rnp_root->completed) {
} else {
ret = rcu_start_gp_advanced(rdp->rsp, rnp_root, rdp); //如果没有gp在处理,初始化一个新的gp
}
unlock_out:
if (rnp != rnp_root)
raw_spin_unlock_rcu_node(rnp_root);
out:
if (c_out != NULL)
*c_out = c;
return ret;
}

rcu_start_gp_advanced

rcu_start_gp_advanced调用前需要获取root锁,然后才能对rcu_state初始化一个新的gp,初始化成功后返回true,表示需要唤醒gp kthread对新的gp进行处理
rcu_start_gp_advanced调用了cpu_needs_another_gp进行判断是否需要启动一个新的gp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* Start a new RCU grace period if warranted, re-initializing the hierarchy
* in preparation for detecting the next grace period. The caller must hold
* the root node's ->lock and hard irqs must be disabled.
*
* Note that it is legal for a dying CPU (which is marked as offline) to
* invoke this function. This can happen when the dying CPU reports its
* quiescent state.
*
* Returns true if the grace-period kthread must be awakened.
*/
static bool
rcu_start_gp_advanced(struct rcu_state *rsp, struct rcu_node *rnp,
struct rcu_data *rdp)
{
if (!rsp->gp_kthread || !cpu_needs_another_gp(rsp, rdp)) {
/*
* Either we have not yet spawned the grace-period
* task, this CPU does not need another grace period,
* or a grace period is already in progress.
* Either way, don't start a new grace period.
*/
return false;
}
WRITE_ONCE(rsp->gp_flags, RCU_GP_FLAG_INIT); //初始化一个新的gp
/*
* We can't do wakeups while holding the rnp->lock, as that
* could cause possible deadlocks with the rq->lock. Defer
* the wakeup to our caller.
*/
return true;
}

rcu_sched/rcu_bh gp kthread

rcu_gp_kthread主要阻塞等待一个新的gp初始化RCU_GP_FLAG_INIT
然后在一个for循环中等待当前gp完成,最后退出循环,处理callback或是唤醒nocbs leader
在rcu_gp_init中,主要是更新rsp->gpnum+1,然后初始化所有node的qsmask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
static int __noreturn rcu_gp_kthread(void *arg)
{
bool first_gp_fqs;
int gf;
unsigned long j;
int ret;
struct rcu_state *rsp = arg;
struct rcu_node *rnp = rcu_get_root(rsp);
rcu_bind_gp_kthread(); //运行在timekeeping cpu上
for (;;) {
/* Handle grace-period start. */
for (;;) { //等待一个新的gp初始化
rsp->gp_state = RCU_GP_WAIT_GPS;
swait_event_interruptible(rsp->gp_wq,
READ_ONCE(rsp->gp_flags) &
RCU_GP_FLAG_INIT); //等待一个新的gp
rsp->gp_state = RCU_GP_DONE_GPS;
/* Locking provides needed memory barrier. */
if (rcu_gp_init(rsp))
break;
cond_resched_rcu_qs();
WRITE_ONCE(rsp->gp_activity, jiffies);
WARN_ON(signal_pending(current));
}
/* Handle quiescent-state forcing. */
first_gp_fqs = true;
j = jiffies_till_first_fqs;
if (j > HZ) {
j = HZ;
jiffies_till_first_fqs = HZ;
}
ret = 0;
for (;;) { //循环到当前gp结束
if (!ret) {
rsp->jiffies_force_qs = jiffies + j; //1s
WRITE_ONCE(rsp->jiffies_kick_kthreads,
jiffies + 3 * j); //3s
}
rsp->gp_state = RCU_GP_WAIT_FQS;
ret = swait_event_interruptible_timeout(rsp->gp_wq,
rcu_gp_fqs_check_wake(rsp, &gf), j); //如果有fqs信号,或者当前gp完成了,或超时,或者收到信号则唤醒
rsp->gp_state = RCU_GP_DOING_FQS;
/* Locking provides needed memory barriers. */
/* If grace period done, leave loop. */
if (!READ_ONCE(rnp->qsmask) && //root node没有qs要等,表示gp结束了
!rcu_preempt_blocked_readers_cgp(rnp))
break;
/* If time for quiescent-state forcing, do it. */
if (ULONG_CMP_GE(jiffies, rsp->jiffies_force_qs) || //需要强制qs
(gf & RCU_GP_FLAG_FQS)) {
rcu_gp_fqs(rsp, first_gp_fqs); //强制qs
first_gp_fqs = false;
trace_rcu_grace_period(rsp->name,
READ_ONCE(rsp->gpnum),
TPS("fqsend"));
cond_resched_rcu_qs();
WRITE_ONCE(rsp->gp_activity, jiffies);
ret = 0; /* Force full wait till next FQS. */
j = jiffies_till_next_fqs;
if (j > HZ) {
j = HZ;
jiffies_till_next_fqs = HZ;
} else if (j < 1) {
j = 1;
jiffies_till_next_fqs = 1;
}
} else {
/* Deal with stray signal. */ //被信号唤醒,更新超时时间,继续阻塞等待
cond_resched_rcu_qs();
WRITE_ONCE(rsp->gp_activity, jiffies);
WARN_ON(signal_pending(current));
ret = 1; /* Keep old FQS timing. */
j = jiffies;
if (time_after(jiffies, rsp->jiffies_force_qs))
j = 1;
else
j = rsp->jiffies_force_qs - j;
}
}
// gp完成后处理
/* Handle grace-period end. */
rsp->gp_state = RCU_GP_CLEANUP;
rcu_gp_cleanup(rsp);
rsp->gp_state = RCU_GP_CLEANED;
}
}

forcing qs

forcing qs 给在tickless idle状态的cpu一个发现qs的机会
在rcu_report_qs_rsp和force_quiescent_state(__call_rcu_core中很多pending callback)中会设置RCU_GP_FLAG_FQS标记
或者在间隔了一小段时间后当前gp还没有完成,则需要forcing qs
也就是调用rcu_gp_fqs

rcu_gp_fqs

当前gp第一次fqs的时候,会调用dyntick_save_progress_counter判定所有cpu是否idle,如果是则在force_qs_rnp中为所有这些idle的cpu调用rcu_report_qs_rnp来设置qs
而在非第一次的时候,则会调用rcu_implicit_dynticks_qs用更复杂的手段来判定cpu是否经过qs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
static void rcu_gp_fqs(struct rcu_state *rsp, bool first_time)
{
bool isidle = false;
unsigned long maxj;
struct rcu_node *rnp = rcu_get_root(rsp);
WRITE_ONCE(rsp->gp_activity, jiffies);
rsp->n_force_qs++;
if (first_time) {
/* Collect dyntick-idle snapshots. */
if (is_sysidle_rcu_state(rsp)) {
isidle = true;
maxj = jiffies - ULONG_MAX / 4;
}
force_qs_rnp(rsp, dyntick_save_progress_counter,
&isidle, &maxj);
rcu_sysidle_report_gp(rsp, isidle, maxj);
} else {
/* Handle dyntick-idle and offline CPUs. */
isidle = true;
force_qs_rnp(rsp, rcu_implicit_dynticks_qs, &isidle, &maxj);
}
/* Clear flag to prevent immediate re-entry. */
if (READ_ONCE(rsp->gp_flags) & RCU_GP_FLAG_FQS) { //清空forcing qs标记,因为外层循环正在相关处理
raw_spin_lock_irq_rcu_node(rnp);
WRITE_ONCE(rsp->gp_flags,
READ_ONCE(rsp->gp_flags) & ~RCU_GP_FLAG_FQS);
raw_spin_unlock_irq_rcu_node(rnp);
}
}
static int rcu_implicit_dynticks_qs(struct rcu_data *rdp,
bool *isidle, unsigned long *maxj)
{
unsigned int curr;
int *rcrmp;
unsigned int snap;
curr = (unsigned int)atomic_add_return(0, &rdp->dynticks->dynticks);
snap = (unsigned int)rdp->dynticks_snap;
if ((curr & 0x1) == 0 || UINT_CMP_GE(curr, snap + 2)) { //idle, 或在当前gp内调度过,说明有qs
trace_rcu_fqs(rdp->rsp->name, rdp->gpnum, rdp->cpu, TPS("dti"));
rdp->dynticks_fqs++;
return 1;
}
//还不算久
if (ULONG_CMP_GE(rdp->rsp->gp_start + 2, jiffies))
return 0; /* Grace period is not old enough. */
barrier();
if (cpu_is_offline(rdp->cpu)) {
rdp->offline_fqs++;
return 1;
}
/*
* A CPU running for an extended time within the kernel can
* delay RCU grace periods. When the CPU is in NO_HZ_FULL mode,
* even context-switching back and forth between a pair of
* in-kernel CPU-bound tasks cannot advance grace periods.
* So if the grace period is old enough, make the CPU pay attention.
* Note that the unsynchronized assignments to the per-CPU
* rcu_sched_qs_mask variable are safe. Yes, setting of
* bits can be lost, but they will be set again on the next
* force-quiescent-state pass. So lost bit sets do not result
* in incorrect behavior, merely in a grace period lasting
* a few jiffies longer than it might otherwise. Because
* there are at most four threads involved, and because the
* updates are only once every few jiffies, the probability of
* lossage (and thus of slight grace-period extension) is
* quite low.
*
* Note that if the jiffies_till_sched_qs boot/sysfs parameter
* is set too high, we override with half of the RCU CPU stall
* warning delay.
*/
//已经有一段时间了,设置rcu_sched_qs_mask标记
rcrmp = &per_cpu(rcu_sched_qs_mask, rdp->cpu);
if (ULONG_CMP_GE(jiffies,
rdp->rsp->gp_start + jiffies_till_sched_qs) ||
ULONG_CMP_GE(jiffies, rdp->rsp->jiffies_resched)) {
if (!(READ_ONCE(*rcrmp) & rdp->rsp->flavor_mask)) {
WRITE_ONCE(rdp->cond_resched_completed,
READ_ONCE(rdp->mynode->completed));
smp_mb(); /* ->cond_resched_completed before *rcrmp. */
WRITE_ONCE(*rcrmp,
READ_ONCE(*rcrmp) + rdp->rsp->flavor_mask);
}
rdp->rsp->jiffies_resched += 5; /* Re-enable beating. */
}
//已经太长时间了,直接扔给调度器
/* And if it has been a really long time, kick the CPU as well. */
if (ULONG_CMP_GE(jiffies,
rdp->rsp->gp_start + 2 * jiffies_till_sched_qs) ||
ULONG_CMP_GE(jiffies, rdp->rsp->gp_start + jiffies_till_sched_qs))
resched_cpu(rdp->cpu); /* Force CPU into scheduler. */
return 0;
}

对于rcu_implicit_dynticks_qs主要注意的是rcu_sched_qs_mask的使用。
在调度的时候,rcu_note_voluntary_context_switch和rcu_note_context_switch中,如果当前cpu设置了rcu_sched_qs_mask则会调用rcu_momentary_dyntick_idle
rcu_momentary_dyntick_idle则会为对应cpu的dynticks+2,在不改变idle特性的情况下,和rdp->dynticks_snap的对比就表示经过了调度点,可以认为是经过了一个qs

noCBs

nocbs是没个flavor在每个cpu上启动callback offload kthread, 当gp完成时,gp kthread通知rcuo kthread调用callback
需要注意的是当cpu核心数量很多的时候,会导致gp kthread需要唤醒大量的rcuo kthread, 会导致gp延时,
因此引入了loader-follower的模型,gp kthread只唤醒leader rcuo, 然后leader再去唤醒follower

在rcu_spawn_all_nocb_kthreads中为每个flaver在每个cpu启动了rcuo kthread
然后在rcu_init_nohz中调用rcu_organize_nocb_kthreads设置leader-follower的关系
默认leader管理的follower数量是nr_cpu_ids的开根号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
bool lazy, unsigned long flags)
{
if (!rcu_is_nocb_cpu(rdp->cpu))
return false;
__call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); //存放到rdp->nocb_head队列中
/*
* If called from an extended quiescent state with interrupts
* disabled, invoke the RCU core in order to allow the idle-entry
* deferred-wakeup check to function.
*/
if (irqs_disabled_flags(flags) &&
!rcu_is_watching() && //idle 关中断, 延迟到在软中断上下文中检测是否需要被唤醒
cpu_online(smp_processor_id()))
invoke_rcu_core();
return true;
}
static void __call_rcu_nocb_enqueue(struct rcu_data *rdp,
struct rcu_head *rhp,
struct rcu_head **rhtp,
int rhcount, int rhcount_lazy,
unsigned long flags)
{
int len;
struct rcu_head **old_rhpp;
struct task_struct *t;
/* Enqueue the callback on the nocb list and update counts. */
// rhp callback插入nocb_tail尾
atomic_long_add(rhcount, &rdp->nocb_q_count);
old_rhpp = xchg(&rdp->nocb_tail, rhtp);
WRITE_ONCE(*old_rhpp, rhp);
atomic_long_add(rhcount_lazy, &rdp->nocb_q_count_lazy);
smp_mb__after_atomic(); /* Store *old_rhpp before _wake test. */
/* If we are not being polled and there is a kthread, awaken it ... */
t = READ_ONCE(rdp->nocb_kthread);
if (rcu_nocb_poll || !t) {
return;
}
len = atomic_long_read(&rdp->nocb_q_count);
if (old_rhpp == &rdp->nocb_head) { //第一个callback被插入,需要唤醒
if (!irqs_disabled_flags(flags)) {
/* ... if queue was empty ... */
wake_nocb_leader(rdp, false); //唤醒leader
} else {
rdp->nocb_defer_wakeup = RCU_NOGP_WAKE; //正在关中断,需要快速结束,推迟到软中断中处理唤醒
}
rdp->qlen_last_fqs_check = 0;
} else if (len > rdp->qlen_last_fqs_check + qhimark) { //callback太多
/* ... or if many callbacks queued. */
if (!irqs_disabled_flags(flags)) {
wake_nocb_leader(rdp, true);
} else {
rdp->nocb_defer_wakeup = RCU_NOGP_WAKE_FORCE; //强制唤醒
}
rdp->qlen_last_fqs_check = LONG_MAX / 2;
} else {
}
return;
}
/*
* Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes
* callbacks queued by the corresponding no-CBs CPU, however, there is
* an optional leader-follower relationship so that the grace-period
* kthreads don't have to do quite so many wakeups.
*/
static int rcu_nocb_kthread(void *arg)
{
int c, cl;
struct rcu_head *list;
struct rcu_head *next;
struct rcu_head **tail;
struct rcu_data *rdp = arg;
/* Each pass through this loop invokes one batch of callbacks */
for (;;) {
/* Wait for callbacks. */
// leader 和 follower阻塞等待,返回的时候都必须有callback存放到nocb_follower_head
if (rdp->nocb_leader == rdp)
nocb_leader_wait(rdp);
else
nocb_follower_wait(rdp);
/* Pull the ready-to-invoke callbacks onto local list. */
list = READ_ONCE(rdp->nocb_follower_head);
WRITE_ONCE(rdp->nocb_follower_head, NULL);
tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);
/* Each pass through the following loop invokes a callback. */
//遍历callback列表,调用callback
c = cl = 0;
while (list) {
next = list->next;
/* Wait for enqueuing to complete, if needed. */
while (next == NULL && &list->next != tail) {
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WaitQueue"));
schedule_timeout_interruptible(1);
trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
TPS("WokeQueue"));
next = list->next;
}
local_bh_disable(); //因为有可能是rcu_bh_state所以要关闭
if (__rcu_reclaim(rdp->rsp->name, list)) //调用callback或者kfree
cl++;
c++;
local_bh_enable();
list = next;
}
smp_mb__before_atomic(); /* _add after CB invocation. */
atomic_long_add(-c, &rdp->nocb_q_count);
atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
rdp->n_nocbs_invoked += c;
}
return 0;
}
/*
* Leaders come here to wait for additional callbacks to show up.
* This function does not return until callbacks appear.
*/
static void nocb_leader_wait(struct rcu_data *my_rdp)
{
bool gotcbs;
struct rcu_data *rdp;
struct rcu_head **tail;
wait_again:
/* Wait for callbacks to appear. */
if (!rcu_nocb_poll) {
swait_event_interruptible(my_rdp->nocb_wq,
!READ_ONCE(my_rdp->nocb_leader_sleep));
/* Memory barrier handled by smp_mb() calls below and repoll. */
}
// leader被唤醒,或者poll模式
gotcbs = false;
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
rdp->nocb_gp_head = READ_ONCE(rdp->nocb_head); //cbs 移动到nocb_gp_head中
if (!rdp->nocb_gp_head)
continue; /* No CBs here, try next follower. */
/* Move callbacks to wait-for-GP list, which is empty. */
WRITE_ONCE(rdp->nocb_head, NULL);
rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
gotcbs = true;
}
// 没有cbs,则继续wait
if (unlikely(!gotcbs)) {
if (!rcu_nocb_poll)
trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
"WokeEmpty");
WARN_ON(signal_pending(current));
schedule_timeout_interruptible(1);
/* Rescan in case we were a victim of memory ordering. */
my_rdp->nocb_leader_sleep = true;
smp_mb(); /* Ensure _sleep true before scan. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
if (READ_ONCE(rdp->nocb_head)) {
/* Found CB, so short-circuit next wait. */
my_rdp->nocb_leader_sleep = false;
break;
}
goto wait_again;
}
/* Wait for one grace period. */
rcu_nocb_wait_gp(my_rdp); //阻塞等待一个gp
/*
* We left ->nocb_leader_sleep unset to reduce cache thrashing.
* We set it now, but recheck for new callbacks while
* traversing our follower list.
*/
my_rdp->nocb_leader_sleep = true;
smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */
/* Each pass through the following loop wakes a follower, if needed. */
for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
if (READ_ONCE(rdp->nocb_head))
my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
if (!rdp->nocb_gp_head)
continue; /* No CBs, so no need to wake follower. */
/* Append callbacks to follower's "done" list. */
tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail); //nocb_gp_head-nocb_gp_tail转存到nocb_follower_head-nocb_follower_tail中
*tail = rdp->nocb_gp_head;
smp_mb__after_atomic(); /* Store *tail before wakeup. */
if (rdp != my_rdp && tail == &rdp->nocb_follower_head) { //nocb_follower_head之前为空,则唤醒follower
/*
* List was empty, wake up the follower.
* Memory barriers supplied by atomic_long_add().
*/
swake_up(&rdp->nocb_wq);
}
}
/* If we (the leader) don't have CBs, go wait some more. */
if (!my_rdp->nocb_follower_head) //leader没有cbs,继续wait
goto wait_again;
}
/*
* Followers come here to wait for additional callbacks to show up.
* This function does not return until callbacks appear.
*/
static void nocb_follower_wait(struct rcu_data *rdp)
{
for (;;) {
if (!rcu_nocb_poll) {
swait_event_interruptible(rdp->nocb_wq,
READ_ONCE(rdp->nocb_follower_head));
}
if (smp_load_acquire(&rdp->nocb_follower_head)) { //有callback才返回
/* ^^^ Ensure CB invocation follows _head test. */
return;
}
schedule_timeout_interruptible(1);
}
}
/*
* If necessary, kick off a new grace period, and either way wait
* for a subsequent grace period to complete.
*/
static void rcu_nocb_wait_gp(struct rcu_data *rdp)
{
unsigned long c;
bool d;
unsigned long flags;
bool needwake;
struct rcu_node *rnp = rdp->mynode;
raw_spin_lock_irqsave_rcu_node(rnp, flags);
needwake = rcu_start_future_gp(rnp, rdp, &c); //标记一个新的gp,c为新的gp number
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
if (needwake)
rcu_gp_kthread_wake(rdp->rsp); //gp完成唤醒gp kthread
for (;;) {
swait_event_interruptible( //阻塞直到c完成或者收到信号
rnp->nocb_gp_wq[c & 0x1],
(d = ULONG_CMP_GE(READ_ONCE(rnp->completed), c)));
if (likely(d))
break;
}
smp_mb(); /* Ensure that CB invocation happens after GP end. */
}

expedited gp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
void synchronize_sched_expedited(void)
{
unsigned long s;
struct rcu_state *rsp = &rcu_sched_state;
/* If only one CPU, this is automatically a grace period. */
if (rcu_blocking_is_gp()) //单cpu,gp完成
return;
/* If expedited grace periods are prohibited, fall back to normal. */
if (rcu_gp_is_normal()) { //rcu_normal开启,不允许加速
wait_rcu_gp(call_rcu_sched);
return;
}
/* Take a snapshot of the sequence number. */
s = rcu_exp_gp_seq_snap(rsp); //rsp->expedited_sequence
if (exp_funnel_lock(rsp, s))
return; /* Someone else did our work for us. */ //其他人加速了当前gp
/* Initialize the rcu_node tree in preparation for the wait. */
sync_rcu_exp_select_cpus(rsp, sync_sched_exp_handler);
/* Wait and clean up, including waking everyone. */
rcu_exp_wait_wake(rsp, s);
}
static bool exp_funnel_lock(struct rcu_state *rsp, unsigned long s)
{
struct rcu_data *rdp = per_cpu_ptr(rsp->rda, raw_smp_processor_id());
struct rcu_node *rnp = rdp->mynode;
struct rcu_node *rnp_root = rcu_get_root(rsp);
/* Low-contention fastpath. */
if (ULONG_CMP_LT(READ_ONCE(rnp->exp_seq_rq), s) && //exp_seq_rq<=s说明不需要等别人
(rnp == rnp_root ||
ULONG_CMP_LT(READ_ONCE(rnp_root->exp_seq_rq), s)) &&
!mutex_is_locked(&rsp->exp_mutex) &&
mutex_trylock(&rsp->exp_mutex)) //并且如果获取到exp_mutex则开始加速,
goto fastpath;
/*
* Each pass through the following loop works its way up
* the rcu_node tree, returning if others have done the work or
* otherwise falls through to acquire rsp->exp_mutex. The mapping
* from CPU to rcu_node structure can be inexact, as it is just
* promoting locality and is not strictly needed for correctness.
*/
for (; rnp != NULL; rnp = rnp->parent) {
if (sync_exp_work_done(rsp, &rdp->exp_workdone1, s))
return true;
/* Work not done, either wait here or go up. */
spin_lock(&rnp->exp_lock);
if (ULONG_CMP_GE(rnp->exp_seq_rq, s)) { //exp_seq_rq被其他人更新过了,因此等待
/* Someone else doing GP, so wait for them. */
spin_unlock(&rnp->exp_lock);
trace_rcu_exp_funnel_lock(rsp->name, rnp->level,
rnp->grplo, rnp->grphi,
TPS("wait"));
wait_event(rnp->exp_wq[(s >> 1) & 0x3],
sync_exp_work_done(rsp,
&rdp->exp_workdone2, s));
return true;
}
rnp->exp_seq_rq = s; /* Followers can wait on us. */ //更新,其他人等待被唤醒
spin_unlock(&rnp->exp_lock);
trace_rcu_exp_funnel_lock(rsp->name, rnp->level, rnp->grplo,
rnp->grphi, TPS("nxtlvl"));
}
mutex_lock(&rsp->exp_mutex);
fastpath: //直接到fast path说明是没有其他人在做exp, 否则就是说明其他人做完exp后被其唤醒
if (sync_exp_work_done(rsp, &rdp->exp_workdone3, s)) { //rsp->expedited_sequence >=s?
mutex_unlock(&rsp->exp_mutex);
return true;
}
rcu_exp_gp_seq_start(rsp); //expedited_sequence+1
return false;
}
/*
* Select the nodes that the upcoming expedited grace period needs
* to wait for.
*/
static void sync_rcu_exp_select_cpus(struct rcu_state *rsp,
smp_call_func_t func)
{
int cpu;
unsigned long flags;
unsigned long mask;
unsigned long mask_ofl_test;
unsigned long mask_ofl_ipi;
int ret;
struct rcu_node *rnp;
sync_exp_reset_tree(rsp);
rcu_for_each_leaf_node(rsp, rnp) {
raw_spin_lock_irqsave_rcu_node(rnp, flags);
/* Each pass checks a CPU for identity, offline, and idle. */
mask_ofl_test = 0;
for (cpu = rnp->grplo; cpu <= rnp->grphi; cpu++) {
struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
if (raw_smp_processor_id() == cpu || //当前cpu
!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) //idle
mask_ofl_test |= rdp->grpmask;
}
mask_ofl_ipi = rnp->expmask & ~mask_ofl_test; //去掉offline和idle节点
/*
* Need to wait for any blocked tasks as well. Note that
* additional blocking tasks will also block the expedited
* GP until such time as the ->expmask bits are cleared.
*/
if (rcu_preempt_has_tasks(rnp))
rnp->exp_tasks = rnp->blkd_tasks.next;
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
/* IPI the remaining CPUs for expedited quiescent state. */
mask = 1;
for (cpu = rnp->grplo; cpu <= rnp->grphi; cpu++, mask <<= 1) {
if (!(mask_ofl_ipi & mask))
continue;
retry_ipi:
ret = smp_call_function_single(cpu, func, rsp, 0); //发送ipi在特定cpu上调度执行sync_sched_exp_handler
if (!ret) {
mask_ofl_ipi &= ~mask;
continue;
}
// 检测对应cpu是否不在线处理
...
}
/* Report quiescent states for those that went offline. */
mask_ofl_test |= mask_ofl_ipi;
if (mask_ofl_test)
rcu_report_exp_cpu_mult(rsp, rnp, mask_ofl_test, false); //报告idle和offline的cpu,去掉expmask对应位
}
}
/* Invoked on each online non-idle CPU for expedited quiescent state. */
static void sync_sched_exp_handler(void *data)
{
struct rcu_data *rdp;
struct rcu_node *rnp;
struct rcu_state *rsp = data;
rdp = this_cpu_ptr(rsp->rda);
rnp = rdp->mynode;
if (!(READ_ONCE(rnp->expmask) & rdp->grpmask) || //当前cpu不需要报告exp
__this_cpu_read(rcu_sched_data.cpu_no_qs.b.exp)) //当前cpu正在做exp
return;
if (rcu_is_cpu_rrupt_from_idle()) { // cpu idle直接报告qs,并唤醒rsp->expedited_wq
rcu_report_exp_rdp(&rcu_sched_state,
this_cpu_ptr(&rcu_sched_data), true);
return;
}
__this_cpu_write(rcu_sched_data.cpu_no_qs.b.exp, true); //标记正在exp
resched_cpu(smp_processor_id()); //当前cpu重新调度完成qs
}
//最后调用rcu_exp_wait_wake等待egp完成
static void rcu_exp_wait_wake(struct rcu_state *rsp, unsigned long s)
{
struct rcu_node *rnp;
synchronize_sched_expedited_wait(rsp); //阻塞等待rsp->expedited_wq被唤醒并检测egp完成
rcu_exp_gp_seq_end(rsp); //expedited_sequence+1, 标记egp完成
mutex_lock(&rsp->exp_wake_mutex);
mutex_unlock(&rsp->exp_mutex);
rcu_for_each_node_breadth_first(rsp, rnp) {
if (ULONG_CMP_LT(READ_ONCE(rnp->exp_seq_rq), s)) {
spin_lock(&rnp->exp_lock);
/* Recheck, avoid hang in case someone just arrived. */
if (ULONG_CMP_LT(rnp->exp_seq_rq, s))
rnp->exp_seq_rq = s; //更新exp_seq_rq
spin_unlock(&rnp->exp_lock);
}
wake_up_all(&rnp->exp_wq[(rsp->expedited_sequence >> 1) & 0x3]); //唤醒follower
}
mutex_unlock(&rsp->exp_wake_mutex);
}

rcu_read_lock_bh/rcu_read_unlock_bh,call_rcu_bh…

rcu_read_lock_bh和rcu_read_unlock_bh就是调用local_bh_disable/local_bh_enable
然后可以看到在__do_softirq中执行完action之后,会执行rcu_bh_qs()上报qs

初始化

在rcu_init中初始化树形结构,注册RCU_SOFTIRQ软中断,然后在early_initcall中调用rcu_spawn_gp_kthread启动内核线程rcu_sched,rcu_bh, 每个cpu的rcuos和rcuob内核线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void __init rcu_init(void)
{
rcu_init_one(&rcu_bh_state);
rcu_init_one(&rcu_sched_state);
__rcu_init_preempt();
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
...
cpu_notifier(rcu_cpu_notify, 0);
for_each_online_cpu(cpu)
rcu_cpu_notify(NULL, CPU_UP_PREPARE, (void *)(long)cpu);
}
int rcu_cpu_notify(struct notifier_block *self,
unsigned long action, void *hcpu)
{
switch (action) {
case CPU_UP_PREPARE:
case CPU_UP_PREPARE_FROZEN:
rcu_prepare_cpu(cpu); //初始化rcu_data
rcu_prepare_kthreads(cpu);
rcu_spawn_all_nocb_kthreads(cpu);
break;
...
}
...
}
static void rcu_prepare_cpu(int cpu)
{
struct rcu_state *rsp;
for_each_rcu_flavor(rsp)
rcu_init_percpu_data(cpu, rsp); //初始化rcu_data, 如果是nocb_cpu则把RCU_NEXT_TAIL设置为空,不让callback入队
}
/*
* Spawn the kthreads that handle each RCU flavor's grace periods.
*/
static int __init rcu_spawn_gp_kthread(void)
{
unsigned long flags;
int kthread_prio_in = kthread_prio;
struct rcu_node *rnp;
struct rcu_state *rsp;
struct sched_param sp;
struct task_struct *t;
rcu_scheduler_fully_active = 1;
for_each_rcu_flavor(rsp) {
t = kthread_create(rcu_gp_kthread, rsp, "%s", rsp->name);
rnp = rcu_get_root(rsp);
raw_spin_lock_irqsave_rcu_node(rnp, flags);
rsp->gp_kthread = t;
if (kthread_prio) {
sp.sched_priority = kthread_prio;
sched_setscheduler_nocheck(t, SCHED_FIFO, &sp);
}
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
wake_up_process(t);
}
rcu_spawn_nocb_kthreads(); //rcuos,rcuob
rcu_spawn_boost_kthreads(); //开启抢占rcu的时候,避免reader的抢占导致当前gp时间过长
return 0;
}
early_initcall(rcu_spawn_gp_kthread);

初始化树形结构

RCU_FANOUT: 非leaf节点的children数量, 默认为word size,64位系统就是64
RCU_FANOUT_LEAF:leaf rcu_node管理的cpu数量,也就是管理的rcu_data数量, 默认大小同上,64位系统也是64

具体的层次数量和NR_CPUS有关, 目前最多支持4层,64位系统最多支持16777216核心
在NR_CPUS=64的时候就是单层,在NR_CPUS=8192的时候就是3层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#define RCU_FANOUT_1 (RCU_FANOUT_LEAF) //64
#define RCU_FANOUT_2 (RCU_FANOUT_1 * RCU_FANOUT) //4096
#define RCU_FANOUT_3 (RCU_FANOUT_2 * RCU_FANOUT) //262144
#define RCU_FANOUT_4 (RCU_FANOUT_3 * RCU_FANOUT) //16777216
#if NR_CPUS <= RCU_FANOUT_1
# define RCU_NUM_LVLS 1
# define NUM_RCU_LVL_0 1
# define NUM_RCU_NODES NUM_RCU_LVL_0
# define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0 }
# define RCU_NODE_NAME_INIT { "rcu_node_0" }
# define RCU_FQS_NAME_INIT { "rcu_node_fqs_0" }
#elif NR_CPUS <= RCU_FANOUT_2
# define RCU_NUM_LVLS 2
# define NUM_RCU_LVL_0 1
# define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
# define NUM_RCU_NODES (NUM_RCU_LVL_0 + NUM_RCU_LVL_1)
# define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1 }
# define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1" }
# define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1" }
#elif NR_CPUS <= RCU_FANOUT_3
# define RCU_NUM_LVLS 3
# define NUM_RCU_LVL_0 1
# define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
# define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
# define NUM_RCU_NODES (NUM_RCU_LVL_0 + NUM_RCU_LVL_1 + NUM_RCU_LVL_2)
# define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2 }
# define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1", "rcu_node_2" }
# define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2" }
#elif NR_CPUS <= RCU_FANOUT_4
# define RCU_NUM_LVLS 4
# define NUM_RCU_LVL_0 1
# define NUM_RCU_LVL_1 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_3)
# define NUM_RCU_LVL_2 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_2)
# define NUM_RCU_LVL_3 DIV_ROUND_UP(NR_CPUS, RCU_FANOUT_1)
# define NUM_RCU_NODES (NUM_RCU_LVL_0 + NUM_RCU_LVL_1 + NUM_RCU_LVL_2 + NUM_RCU_LVL_3)
# define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2, NUM_RCU_LVL_3 }
# define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1", "rcu_node_2", "rcu_node_3" }
# define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2", "rcu_node_fqs_3" }
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#define RCU_STATE_INITIALIZER(sname, sabbr, cr) \
DEFINE_RCU_TPS(sname) \
static DEFINE_PER_CPU_SHARED_ALIGNED(struct rcu_data, sname##_data); \
struct rcu_state sname##_state = { \
.level = { &sname##_state.node[0] }, \
.rda = &sname##_data, \
.call = cr, \
.gp_state = RCU_GP_IDLE, \
.gpnum = 0UL - 300UL, \
.completed = 0UL - 300UL, \
.orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
.orphan_nxttail = &sname##_state.orphan_nxtlist, \
.orphan_donetail = &sname##_state.orphan_donelist, \
.barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
.name = RCU_STATE_NAME(sname), \
.abbr = sabbr, \
.exp_mutex = __MUTEX_INITIALIZER(sname##_state.exp_mutex), \
.exp_wake_mutex = __MUTEX_INITIALIZER(sname##_state.exp_wake_mutex), \
}
RCU_STATE_INITIALIZER(rcu_sched, 's', call_rcu_sched);
RCU_STATE_INITIALIZER(rcu_bh, 'b', call_rcu_bh);
static void __init rcu_init_one(struct rcu_state *rsp)
{
static const char * const buf[] = RCU_NODE_NAME_INIT;
static const char * const fqs[] = RCU_FQS_NAME_INIT;
static struct lock_class_key rcu_node_class[RCU_NUM_LVLS];
static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS];
static u8 fl_mask = 0x1;
int levelcnt[RCU_NUM_LVLS]; /* # nodes in each level. */
int levelspread[RCU_NUM_LVLS]; /* kids/node in each level. */
int cpustride = 1;
int i;
int j;
struct rcu_node *rnp;
rsp->flavor_mask = fl_mask;
fl_mask <<= 1;
/* Initialize the elements themselves, starting from the leaves. */
for (i = rcu_num_lvls - 1; i >= 0; i--) {
cpustride *= levelspread[i];
rnp = rsp->level[i];
for (j = 0; j < levelcnt[i]; j++, rnp++) {
raw_spin_lock_init(&ACCESS_PRIVATE(rnp, lock));
raw_spin_lock_init(&rnp->fqslock);
rnp->gpnum = rsp->gpnum;
rnp->completed = rsp->completed; //gpnum=completed idle
rnp->qsmask = 0;
rnp->qsmaskinit = 0;
rnp->grplo = j * cpustride; //grplo-grphi为负责的cpu范围
rnp->grphi = (j + 1) * cpustride - 1;
if (rnp->grphi >= nr_cpu_ids)
rnp->grphi = nr_cpu_ids - 1;
if (i == 0) { //根节点
rnp->grpnum = 0;
rnp->grpmask = 0;
rnp->parent = NULL;
} else {
rnp->grpnum = j % levelspread[i - 1]; //node在父节点的孩子们中的位置
rnp->grpmask = 1UL << rnp->grpnum; //grpnum的bitmask版本
rnp->parent = rsp->level[i - 1] +
j / levelspread[i - 1];
}
rnp->level = i;
INIT_LIST_HEAD(&rnp->blkd_tasks);
rcu_init_one_nocb(rnp); //init_swait_queue_head(&rnp->nocb_gp_wq[0|1]);
init_waitqueue_head(&rnp->exp_wq[0]);
init_waitqueue_head(&rnp->exp_wq[1]);
init_waitqueue_head(&rnp->exp_wq[2]);
init_waitqueue_head(&rnp->exp_wq[3]);
spin_lock_init(&rnp->exp_lock);
}
}
init_swait_queue_head(&rsp->gp_wq);
init_swait_queue_head(&rsp->expedited_wq);
rnp = rsp->level[rcu_num_lvls - 1];
for_each_possible_cpu(i) {
while (i > rnp->grphi)
rnp++;
per_cpu_ptr(rsp->rda, i)->mynode = rnp;
rcu_boot_init_percpu_data(i, rsp);
}
list_add(&rsp->flavors, &rcu_struct_flavors);
}
static void __init
rcu_boot_init_percpu_data(int cpu, struct rcu_state *rsp)
{
unsigned long flags;
struct rcu_data *rdp = per_cpu_ptr(rsp->rda, cpu);
struct rcu_node *rnp = rcu_get_root(rsp);
/* Set up local state, ensuring consistent view of global state. */
raw_spin_lock_irqsave_rcu_node(rnp, flags);
rdp->grpmask = 1UL << (cpu - rdp->mynode->grplo); //leaf node中孩子的位置
rdp->dynticks = &per_cpu(rcu_dynticks, cpu); //关联到percpu的rcu_dynticks, 所有flavor共享
rdp->cpu = cpu;
rdp->rsp = rsp;
rcu_boot_init_nocb_percpu_data(rdp);
raw_spin_unlock_irqrestore_rcu_node(rnp, flags);
}
static void __init rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
{
rdp->nocb_tail = &rdp->nocb_head;
init_swait_queue_head(&rdp->nocb_wq);
rdp->nocb_follower_tail = &rdp->nocb_follower_head;
}

cond_resched_rcu_qs

当开启nohz_full之后,单个进程运行的时候不会设置时钟中断,因此不会检测到qs, 因此延迟的gp可能会导致性能问题和内存占用高的问题

a quiescent state is one in which no kernel code can hold a reference to any RCU-protected data structure

调用cond_resched()意味着被调用点可以被调度出去,也意味着调用点不会也不能在reader关键区,因此调用cond_sched也意味着一个qs
但是在tickless mode阶段,cond_resched()也意外着没有其他进程可以被调度。
因此引入cond_resched_rcu_qs()包装了cond_sched(),在没有被调度的时候也能检测到qs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#define rcu_note_voluntary_context_switch(t) rcu_all_qs()
/**
* cond_resched_rcu_qs - Report potential quiescent states to RCU
*
* This macro resembles cond_resched(), except that it is defined to
* report potential quiescent states to RCU-tasks even if the cond_resched()
* machinery were to be shut off, as some advocate for PREEMPT kernels.
*/
#define cond_resched_rcu_qs() \
do { \
if (!cond_resched()) \
rcu_note_voluntary_context_switch(current); \
} while (0)
void rcu_all_qs(void)
{
unsigned long flags;
barrier(); /* Avoid RCU read-side critical sections leaking down. */
if (unlikely(raw_cpu_read(rcu_sched_qs_mask))) {
local_irq_save(flags);
rcu_momentary_dyntick_idle();
local_irq_restore(flags);
}
if (unlikely(raw_cpu_read(rcu_sched_data.cpu_no_qs.b.exp))) {
/*
* Yes, we just checked a per-CPU variable with preemption
* enabled, so we might be migrated to some other CPU at
* this point. That is OK because in that case, the
* migration will supply the needed quiescent state.
* We might end up needlessly disabling preemption and
* invoking rcu_sched_qs() on the destination CPU, but
* the probability and cost are both quite low, so this
* should not be a problem in practice.
*/
preempt_disable();
rcu_sched_qs();
preempt_enable();
}
this_cpu_inc(rcu_qs_ctr);
barrier(); /* Avoid RCU read-side critical sections leaking up. */
}

RCU, cond_resched(), and performance regressions

其他参考文献

Introduction to RCU
the only related source code analize I’ve found