我是靠谱客的博主 感动金鱼,这篇文章主要介绍epoll源码解析(2) epoll_ctl引言总结,现在分享给大家,希望可以做个参考。

示例代码内核版本为2.6.38

epoll源码解析(1) epoll_create
epoll源码解析(2) epoll_ctl
epoll源码解析(3) epoll_wait

引言

上一篇文章中分析的epoll的重要数据结构和epoll_create的实现,如果说上篇文章是理解epoll的基础,那这篇文章就是解释为何epoll如此高效的原因,我们来一起看看吧!

在这里插入图片描述

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/* * epfd 为epoll_create创建的fd * op 是我们的操作种类 增删改 ADD DEL MOD * fd 当然是我们要加入的fd了 * event 我们所关心的事件类型 注意只有我们注册的事件才会在epoll_wait被唤醒后传递到用户空间 否则虽然内核可以收到 但不会传递到用户空间 */ SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user *, event) { int error; int did_lock_epmutex = 0; struct file *file, *tfile; struct eventpoll *ep; struct epitem *epi; struct epoll_event epds; error = -EFAULT; /* static inline int ep_op_has_event(int op) { return op != EPOLL_CTL_DEL; // DEL的时候当然不用从用户态拷贝event啦 } */ if (ep_op_has_event(op) && //我们可以在上面看到ep_op_has_event的函数体 用以判断是否需要从用户态拷贝数据 copy_from_user(&epds, event, sizeof(struct epoll_event))) //从用户空间把数据拷贝过来 goto error_return; /* Get the "struct file *" for the eventpoll file */ error = -EBADF; file = fget(epfd); //得到epoll的file指针 if (!file) goto error_return; /* Get the "struct file *" for the target file */ tfile = fget(fd); //得到要添加fd的file指针 if (!tfile) goto error_fput; /* The target file descriptor must support poll */ error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) //如果要监听的fd不支持poll的话就没办法了 只能报error了. goto error_tgt_fput; /* * We have to check that the file structure underneath the file descriptor * the user passed to us _is_ an eventpoll file. And also we do not permit * adding an epoll file descriptor inside itself. */ error = -EINVAL; /* /* static inline int is_file_epoll(struct file *f) { return f->f_op == &eventpoll_fops; //更快的检测fd是否为一个epoll_fd } */ if (file == tfile || !is_file_epoll(file)) //当然不允许自己监听自己了,同时得保证epfd是一个epoll的fd goto error_tgt_fput; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = file->private_data; //上一篇说到private_data中存的是分配好的eventpoll /* * When we insert an epoll file descriptor, inside another epoll file * descriptor, there is the change of creating closed loops, which are * better be handled here, than in more critical paths. * * We hold epmutex across the loop check and the insert in this case, in * order to prevent two separate inserts from racing and each doing the * insert "at the same time" such that ep_loop_check passes on both * before either one does the insert, thereby creating a cycle. */ if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { //防止epoll插入到另一个epoll中 mutex_lock(&epmutex); did_lock_epmutex = 1; error = -ELOOP; if (ep_loop_check(ep, tfile) != 0) goto error_tgt_fput; } mutex_lock(&ep->mtx); //对fd进行操作时上mtx这把锁 /* * Try to lookup the file inside our RB tree, Since we grabbed "mtx" * above, we can be sure to be able to use the item looked up by * ep_find() till we release the mutex. */ epi = ep_find(ep, tfile, fd); //O(logn)查询要操作的fd是否已经存在 /* static inline void ep_set_ffd(struct epoll_filefd *ffd, struct file *file, int fd) //创建一个进行比较的项 { ffd->file = file; ffd->fd = fd; } static inline int ep_cmp_ffd(struct epoll_filefd *p1, struct epoll_filefd *p2) { return (p1->file > p2->file ? +1: (p1->file < p2->file ? -1 : p1->fd - p2->fd)); } //从RB-tree中寻找期望插入的fd是否存在 static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd) { int kcmp; struct rb_node *rbp; struct epitem *epi, *epir = NULL; struct epoll_filefd ffd; ep_set_ffd(&ffd, file, fd); for (rbp = ep->rbr.rb_node; rbp; ) { epi = rb_entry(rbp, struct epitem, rbn); kcmp = ep_cmp_ffd(&ffd, &epi->ffd); //红黑树的排序规则 if (kcmp > 0) rbp = rbp->rb_right; else if (kcmp < 0) rbp = rbp->rb_left; else { epir = epi; break; } } return epir; //如果找到返回找到的fd所对应的节点 } */ error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; //我们可以看到这两个事件显然我们也没有必要注册 error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; } mutex_unlock(&ep->mtx); error_tgt_fput: if (unlikely(did_lock_epmutex)) mutex_unlock(&epmutex); fput(tfile); //正常关闭文件 error_fput: fput(file); error_return: return error; }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
//注意 这个整个函数调用都是上锁的 static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) { int error, revents, pwake = 0; unsigned long flags; long user_watches; struct epitem *epi; struct ep_pqueue epq; //是否到达最大的监听数 user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) //最有意思的地方 slab机制,在缓存中分配对象,为了更快的访问 return -ENOMEM; /* Item initialization follow here ... */ //正如内核注释所言 正常的初始化 INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0; epi->next = EP_UNACTIVE_PTR; /* Initialize the poll table using the queue callback */ epq.epi = epi; //设置poll的回调为ep_ptable_queue_proc,这其中藏着epoll如此高效的原因! init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. Note that after * this operation completes, the poll callback can start hitting * the new item. */ //进行完这一步 算是把epitem和这个fd连接起来了 会在事件到来的时候执行上面注册的回调 revents = tfile->f_op->poll(tfile, &epq.pt); /* * We have to check if something went wrong during the poll wait queue * install process. Namely an allocation for a wait queue failed due * high memory pressure. */ error = -ENOMEM; if (epi->nwait < 0) goto error_unregister; /* Add the current item to the list of active epoll hook for this file */ spin_lock(&tfile->f_lock); //tfile是要加入的fd的file指针 list_add_tail(&epi->fllink, &tfile->f_ep_links); //file会把监听自己的epitem连接起来 spin_unlock(&tfile->f_lock); /* * Add the current item to the RB tree. All RB tree operations are * protected by "mtx", and ep_insert() is called with "mtx" held. */ ep_rbtree_insert(ep, epi); //向ep的红黑树中插入一个epitem /* We have to drop the new item inside our item list to keep track of it */ spin_lock_irqsave(&ep->lock, flags); /* If the file is already "ready" we drop it inside the ready list */ //这里处理的是监听的fd刚好有事件发生,如果现在不处理,可能后面不会到来数据,也就刽触发回调,它们可能就再也不会被处理了 if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); //唤醒epoll_wait中的等待队列 if (waitqueue_active(&ep->poll_wait)) pwake++; } spin_unlock_irqrestore(&ep->lock, flags); atomic_long_inc(&ep->user->epoll_watches); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return 0; error_unregister: ep_unregister_pollwait(ep, epi); /* * We need to do this because an event could have been arrived on some * allocated wait queue. Note that we don't care about the ep->ovflist * list, since that is used/cleaned only inside a section bound by "mtx". * And ep_insert() is called with "mtx" held. */ spin_lock_irqsave(&ep->lock, flags); if (ep_is_linked(&epi->rdllink)) list_del_init(&epi->rdllink); spin_unlock_irqrestore(&ep->lock, flags); kmem_cache_free(epi_cache, epi); return error; }

我们来看看poll被调用时执行的回调ep_ptable_queue_proc

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq; /* struct eppoll_entry { //这个结构体就是做一个epitem和其回调之间的关联 // List header used to link this structure to the "struct epitem" struct list_head llink; // The "base" pointer is set to the container "struct epitem" struct epitem *base; //Wait queue item that will be linked to the target file wait //queue head. wait_queue_t wait; // The wait queue head that linked the "wait" wait queue item wait_queue_head_t *whead; }; */ if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { //初始化一个等待队列的节点 其中注册的回调为ep_poll_callback init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; //这个就是监控的fd的等待队列头 pwq->base = epi; add_wait_queue(whead, &pwq->wait);//把初始化的等待对列头插入到所监控的fd的等待对列中,稍后调用 list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++;//记录了poll执行回调的次数,即数据来了几次 } else { /* We have to signal that an error occurred */ epi->nwait = -1; } }

显然 全文的重点就是那个加入到所监听fd等待队列中的回调ep_poll_callback,我们来一睹它的真容吧!

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) //key是events { int pwake = 0; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); //由等待队列中获取对应的epitem struct eventpoll *ep = epi->ep; //从而得到epoll的结构 spin_lock_irqsave(&ep->lock, flags); //这里要操作rdllist 上锁 /* * If the event mask does not contain any poll(2) event, we consider the * descriptor to be disabled. This condition is likely the effect of the * EPOLLONESHOT bit that disables the descriptor when an event is received, * until the next EPOLL_CTL_MOD will be issued. */ //#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET) if (!(epi->event.events & ~EP_PRIVATE_BITS)) //我们看到如果所监控的fd没有注册什么事件的话,就不加入rdllist中 goto out_unlock; /* * Check the events coming with the callback. At this stage, not * every device reports the events in the "key" parameter of the * callback. We need to be able to handle both cases here, hence the * test for "key" != NULL before the event match test. */ if (key && !((unsigned long) key & epi->event.events)) //所触发的事件我们没有注册的话当然也不加入rdllist中 goto out_unlock; /* * If we are trasfering events to userspace, we can hold no locks * (because we're accessing user memory, and because of linux f_op->poll() * semantics). All the events that happens during that period of time are * chained in ep->ovflist and requeued later on. */ /* * 这里就比较有意思了 * 在epoll_wait中EP_UNACTIVE_PTR初始化为EP_UNACTIVE_PTR,而当epoll_wait]中被唤醒后处理rdlist时会将ep->ovflist置为NULL * 也就是说如果ep->ovflist != EP_UNACTIVE_PTR意味这epoll_wait已被唤醒 正在执行loop * 此时我们就把在rdllist遍历时发生的事件用ovflist串起来,在遍历结束后插入rdllist中 */ if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) { if (epi->next == EP_UNACTIVE_PTR) { epi->next = ep->ovflist; ep->ovflist = epi; } goto out_unlock; } /* If this file is already in the ready list we exit soon */ //将当前的epitem加入到rdllist中 //这就是epoll如此高效的原因,我们不必每次去遍历fd寻找触发的事件,触发事件时会触发回调自动把epitem加入到rdllist中, //这使得复杂度从O(N)降到了O(有效事件集合),且我们不必每次注册事件,仅在epoll_ctl(ADD)中注册一次即可(修改除外), if (!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink, &ep->rdllist); /* * Wake up ( if active ) both the eventpoll wait list and the ->poll() * wait list. */ //唤醒epoll_wait if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; out_unlock: spin_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ if (pwake) ep_poll_safewake(&ep->poll_wait); return 1; }

总结

epoll_ctl我们调了ep_insert来讲,这是因为其基本道出了epoll如此高效的原因,首先是epitem的创建使用了slab机制,利用缓存来加速我们的操作,其次是红黑树,这使得我们的增删改操作的复杂度均为O(logn),最重要的一点就是回调的设计,epoll会在插入一个节点的时候创建一个epoll_entry,然后在其中注册ep_ptable_queue_proc,fd会在poll的时候执行这个回调,同时调用ep_poll_callback,这使得被监控的fd被触发时直接就把epitem加入到rdllist中,这样rdllist中就全部都是就绪的fd了! 在有大量不活跃fd时有着显著的性能提升,除此之外我们还不必每次注册事件,这些都是在epitem中保存好的.

可以说epoll的精髓就在于这个神奇的回调.

最后

以上就是感动金鱼最近收集整理的关于epoll源码解析(2) epoll_ctl引言总结的全部内容,更多相关epoll源码解析(2)内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部