linux epoll实现

epoll_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep); //分配并初始化eventpoll
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); //分配进程fd
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, //通过匿名inode的方式创建file, 所有匿名inode使用相同的inode节省空间,epoll不需要inode的操作
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file); // 设置file到进程文件打开表
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}

epoll_ctl

添加listen socket

1
2
3
4
5
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev);

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
ep = f.file->private_data;
mutex_lock_nested(&ep->mtx, 0);
if (op == EPOLL_CTL_ADD) {
if (!list_empty(&f.file->f_ep_links) || // 当前epoll被其他epoll监视
is_file_epoll(tf.file)) { //当前要添加epoll fd到epoll
// 处理epoll嵌套或者错误的情况
...
}
}
epi = ep_find(ep, tf.file, fd); //在ep的rbtree中搜索file
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP; // POLLIN
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
}
} else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
...
}

通过上面代码可以看到, epoll本身支持被另一个epoll监视, 每个epoll都有个rbtree, 通过这个rbtree来快速查找对某个文件的监视项(epi), rbtree的比较函数为file指针和fd的大小
epoll_ctl再根据是否找到file对应的epi, 以及op操作,来调用对应的函数

ep_insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) //分配epitem
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = ep_item_poll(epi, &epq.pt); // 获取epi对应文件事件, 如果是第一次add listen socket, 因为没有连接被等待accept,则返回0
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links); //f_ep_links用来链接所有监视这个文件的epoll节点
spin_unlock(&tfile->f_lock);
ep_rbtree_insert(ep, epi); //添加到rbtree, key为文件的指针和fd大小
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist); //如果有事件则添加到ep的ready list中
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
}

epoll数据结构

epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,
&ev) == -1) {
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}

sys_epoll_wait调用ep_poll做实际处理

ep_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
if (timeout > 0) {
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}
fetch_events:
spin_lock_irqsave(&ep->lock, flags);
if (!ep_events_available(ep)) { //没有事件,需要阻塞等待
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait); //添加到ep的等待队列中
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
if (ep_events_available(ep) || timed_out) //再次确认是否有事件
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) // sleep等待
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
}
// 收到时间,超时,或者收到信号
__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out) //copy事件到userspace
goto fetch_events;
return res;
}
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}

ep_scan_ready_list

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
if (!ep_locked)
mutex_lock_nested(&ep->mtx, depth);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
list_splice_init(&ep->rdllist, &txlist); //rdllist移动到txlist
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
error = (*sproc)(ep, &txlist, priv); // ep_send_events_proc
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
// 说明在sproc执行时,有新的事件触发(ep_poll_callback),把新的事件加入到ready list
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
list_splice(&txlist, &ep->rdllist); //比如LT模式,或者一些异常原因不能马上处理, 重新放回ready list
__pm_relax(ep->ws);
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
if (!ep_locked)
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}

ep_send_events_proc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
epi = list_first_entry(head, struct epitem, rdllink);
ws = ep_wakeup_source(epi);
if (ws) {
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink); //从链表中移除
revents = ep_item_poll(epi, &pt); // 获取事件
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) { // 获取到事件,需要copy到userspace
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head); //出错添加回去
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) { //LT 模式
list_add_tail(&epi->rdllink, &ep->rdllist); //LT模式要重新加入到ready list
ep_pm_stay_awake(epi);
}
}
}
return eventcnt;
}

ep_item_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
pt->_key = epi->event.events;
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events; //socket_file_ops->sock_poll
}
/* No kernel lock held - perfect */
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
unsigned int busy_flag = 0;
struct socket *sock;
sock = file->private_data;
if (sk_can_busy_loop(sock->sk)) {
/* this socket can poll_ll so tell the system call */
busy_flag = POLL_BUSY_LOOP;
/* once, only if requested by syscall */
if (wait && (wait->_key & POLL_BUSY_LOOP))
sk_busy_loop(sock->sk, 1);
}
return busy_flag | sock->ops->poll(file, sock, wait); // inet_stream_ops->tcp_poll
}
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
unsigned int mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);
int state;
sock_rps_record_flow(sk);
sock_poll_wait(file, sk_sleep(sk), wait); //回调特定的方法异步框架,添加等待任务ep_poll_callback到sk_sleep(sk), ep_ptable_queue_proc,
state = sk_state_load(sk);
if (state == TCP_LISTEN)
return inet_csk_listen_poll(sk); // listen socket返回是否有连接等待被accept
/* Socket is not locked. We are protected from async events
* by poll logic and correct handling of state changes
* made by other threads is impossible in any case.
*/
mask = 0;
/*
* POLLHUP is certainly not done right. But poll() doesn't
* have a notion of HUP in just one direction, and for a
* socket the read side is more interesting.
*
* Some poll() documentation says that POLLHUP is incompatible
* with the POLLOUT/POLLWR flags, so somebody should check this
* all. But careful, it tends to be safer to return too many
* bits than too few, and you can easily break real applications
* if you don't tell them that something has hung up!
*
* Check-me.
*
* Check number 1. POLLHUP is _UNMASKABLE_ event (see UNIX98 and
* our fs/select.c). It means that after we received EOF,
* poll always returns immediately, making impossible poll() on write()
* in state CLOSE_WAIT. One solution is evident --- to set POLLHUP
* if and only if shutdown has been made in both directions.
* Actually, it is interesting to look how Solaris and DUX
* solve this dilemma. I would prefer, if POLLHUP were maskable,
* then we could set it on SND_SHUTDOWN. BTW examples given
* in Stevens' books assume exactly this behaviour, it explains
* why POLLHUP is incompatible with POLLOUT. --ANK
*
* NOTE. Check for TCP_CLOSE is added. The goal is to prevent
* blocking on fresh not-connected or disconnected socket. --ANK
*/
if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE)
mask |= POLLHUP; //POLLHUP是双向的, 如果只是收到对方的fin,说明本地还是能发送数据的
if (sk->sk_shutdown & RCV_SHUTDOWN) //收到fin关闭请求
mask |= POLLIN | POLLRDNORM | POLLRDHUP; //POLLIN和POLLRDHUP可以用来判断收到fin请求, POLLRDHUP为单向
/* Connected or passive Fast Open socket? */
if (state != TCP_SYN_SENT &&
(state != TCP_SYN_RECV || tp->fastopen_rsk)) {
int target = sock_rcvlowat(sk, 0, INT_MAX);
if (tp->urg_seq == tp->copied_seq &&
!sock_flag(sk, SOCK_URGINLINE) &&
tp->urg_data)
target++;
if (tp->rcv_nxt - tp->copied_seq >= target)
mask |= POLLIN | POLLRDNORM;
if (!(sk->sk_shutdown & SEND_SHUTDOWN)) { //本地发送方向没有关闭,还能发送
if (sk_stream_is_writeable(sk)) {
mask |= POLLOUT | POLLWRNORM; //标记可写状态
} else { /* send SIGIO later */
sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
/* Race breaker. If space is freed after
* wspace test but before the flags are set,
* IO signal will be lost. Memory barrier
* pairs with the input side.
*/
smp_mb__after_atomic();
if (sk_stream_is_writeable(sk))
mask |= POLLOUT | POLLWRNORM;
}
} else //发送关闭的时候,依然返回可写,避免应用程序阻塞
mask |= POLLOUT | POLLWRNORM; //https://github.com/torvalds/linux/commit/d84ba638e4ba3c40023ff997aa5e8d3ed002af36
if (tp->urg_data & TCP_URG_VALID)
mask |= POLLPRI;
}
/* This barrier is coupled with smp_wmb() in tcp_reset() */
smp_rmb();
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
mask |= POLLERR;
return mask;
}

sock_poll_wait

sock_poll_wait->poll_wait->ep_ptable_queue_proc, 创建一个eppoll_entry,包含了事件回调ep_poll_callback, 并关联到epi和sk_sleep(sk)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static inline void sock_poll_wait(struct file *filp,
wait_queue_head_t *wait_address, poll_table *p)
{
if (!poll_does_not_wait(p) && wait_address) {
poll_wait(filp, wait_address, p);
/* We need to be sure we are in sync with the
* socket flags modification.
*
* This memory barrier is paired in the wq_has_sleeper.
*/
smp_mb();
}
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p); //ep_ptable_queue_proc
}
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

事件上报和回调

1
2
3
4
5
6
7
8
struct sock {
...
wait_queue_head_t *sk_sleep;
void (*sk_state_change)(struct sock *sk); // 状态变更回调, sock_def_wakeup
void (*sk_data_ready)(struct sock *sk); // 有可读数据, sock_def_readable
void (*sk_write_space)(struct sock *sk); //有可写空间,sock_def_write_space, sk_stream_write_space
void (*sk_error_report)(struct sock *sk); // 错误, sock_def_error_report
};

当协议栈达到特定的状态时,就会调用特定的回调函数, 这里以sk_data_ready为例
比如当tcp收到数据的时候被调用, 最终调用在tcp_poll的时候被挂在sk_sleep(sk)上的ep_poll_callback回调

1
2
3
4
5
6
7
8
9
10
11
12
static void sock_def_readable(struct sock *sk)
{
struct socket_wq *wq;
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq); // sk_sleep(sk)
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI | //ep_poll_callback
POLLRDNORM | POLLRDBAND);
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
rcu_read_unlock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
int ewake = 0;
if ((unsigned long)key & POLLFREE) {
ep_pwq_from_wait(wait)->whead = NULL;
list_del_init(&wait->task_list);
}
spin_lock_irqsave(&ep->lock, flags);
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* If we are transferring events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happen during that period of time are
* chained in ep->ovflist and requeued later on.
*/
// 此时ep_scan_ready_list可能正在拷贝事件到userspace
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi; // 把当前收到通知的epi,挂接到ep->ovflist中,等ep_scan_ready_list copy完事件后处理该链表
if (epi->ws) {
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}
if (!ep_is_linked(&epi->rdllink)) { // 没有添加过才添加到 ready list
list_add_tail(&epi->rdllink, &ep->rdllist); //比如来了10个数据包,并不是来一个添加一个
ep_pm_stay_awake_rcu(epi);
}
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!((unsigned long)key & POLLFREE)) {
switch ((unsigned long)key & EPOLLINOUT_BITS) {
case POLLIN:
if (epi->event.events & POLLIN)
ewake = 1;
break;
case POLLOUT:
if (epi->event.events & POLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up_locked(&ep->wq); //唤醒
}
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
if (epi->event.events & EPOLLEXCLUSIVE)
return ewake;
return 1;
}

ET & LT 对比

  • 通过上面分析在epoll_wait的时候,最终通过ep_send_events_proc copy事件到userspace, 如果是LT模式,还是会重新放入ready list
  • 对于LT模式,当第二次调用的时候, 因为还存放在ready list中,此时不会阻塞,然后在ep_send_events_proc中对tcp_poll最新的信息,如果还是有事件,才继续放入ready list中
  • 对于ET&LT的差别,主要在第二次调用的时候,如果监视的socket很多,readylist遍历就会很长,影响性能

问题来了: listen socket能用ET吗?

nginx对于listen socket采用了LT模式, 新建链接采用ET模式
需要说明的是nginx采用了accept_mutex或者reuseport的方式来避免惊群
当采用accept_mutex方式的时候, 并且multi_accept=off, 意味着epoll_wait后不accept所有等待连接,
这是为了在不同的进程之间做负载均衡, 当别的进程再次epoll_wait(listen_sock)的时候, 如果是ET模式,这时候ready list里已经没有该事件。 所以这种情况只能用LT模式