一.概述:
coroutine是一个非常简单好用的协程库, 其依赖于ucontext, 可用于实现简单的协程应用. 本文对coroutine进行简单的分析
二.结构体分析:
coroutine在设计中分为两个组件, 一个是调度器 schedule
, 一个是协程调度实体 coroutine
, coroutine
用于标明协程实例的相关信息, schedule
用于对当前环境中的协程运行状态进行记录.
1
2
3
4
5
6
7
8
9
10
11// coroutine.c // 协程调度器 struct schedule { char stack[STACK_SIZE]; ucontext_t main; // 正在running的协程在执行完后需切换到的上下文,由于是非对称协程,所以该上下文用来接管协程结束后的程序控制权 int nco; // 调度器中已保存的协程数量 int cap; // 调度器中协程的最大容量 int running; // 调度器中正在running的协程id struct coroutine **co; // 连续内存空间,用于存储所有协程任务 };
在调度器结构体中, 包含了一个栈 schedule.stack
一个 main
上下文, 用于当running的协程执行完毕后接管程序控制权, 还有一个 coroutine*
数组, 用于保存当前所有的协程任务, co
的默认大小为16, 可以通过修改 DEFAULT_COROUTINE
进行修改
1
2
3
4
5
6
7
8
9
10
11
12
13// 协程任务类型 struct coroutine { coroutine_func func; // 协程函数 void *ud; // 协程函数的参数(用户数据) ucontext_t ctx; // 协程上下文 struct schedule * sch; // 协程所属的调度器 // ptrdiff_t定义在stddef.h(cstddef)中,通常被定义为long int类型,通常用来保存两个指针减法操作的结果. ptrdiff_t cap; // 协程栈的最大容量 ptrdiff_t size; // 协程栈的当前容量 int status; // 协程状态(COROUTINE_DEAD/COROUTINE_READY/COROUTINE_RUNNING/COROUTINE_SUSPEND) char *stack; // 协程栈 };
注意对于每一个协程都有一个 char* stack
的私有栈, 关于这个栈的相关问题, 后续会说明.
三.API函数:
1
2
3
4
5
6
7
8
9struct schedule * coroutine_open(void); // 创建协程调度器 void coroutine_close(struct schedule *); // 关闭协程调度器 int coroutine_new(struct schedule *, coroutine_func, void *ud); // 创建协程任务,将其加入调度器中 void coroutine_resume(struct schedule *, int id); // 恢复协程号为id的协程任务 int coroutine_status(struct schedule *, int id); // 根据协程任务id返回协程的当前状态 int coroutine_running(struct schedule *); // 返回调度器S中正在running的协程任务id void coroutine_yield(struct schedule *); // 保存当前上下文后中断当前协程的执行
四.流程:
下面我们通过一个例子来观察一下coroutine的整体流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40#define COROUTINE_DEAD 0 #define COROUTINE_READY 1 #define COROUTINE_RUNNING 2 #define COROUTINE_SUSPEND 3 struct args { int n; }; static void foo(struct schedule * S, void *ud) { struct args * arg = ud; int start = arg->n; int i; for (i=0;i<5;i++) { printf("coroutine %d : %dn",coroutine_running(S) , start + i); coroutine_yield(S); } } static void test(struct schedule *S) { struct args arg1 = { 0 }; struct args arg2 = { 100 }; int co1 = coroutine_new(S, foo, &arg1); int co2 = coroutine_new(S, foo, &arg2); printf("main startn"); while (coroutine_status(S,co1) && coroutine_status(S,co2)) { coroutine_resume(S,co1); coroutine_resume(S,co2); } printf("main endn"); } int main() { struct schedule * S = coroutine_open(); test(S); coroutine_close(S); return 0; }
当我们需要创建协程时我们需要调用 coroutine_open
函数创建并初始化一个协程调度器, 下面我们看一下 coroutine_open
都干了什么
1
2
3
4
5
6
7
8
9
10
11
12// 创建协程调度器schedule struct schedule * coroutine_open(void) { struct schedule *S = malloc(sizeof(*S)); // 从堆上为调度器分配内存空间 S->nco = 0; // 初始化调度器的当前协程数量 S->cap = DEFAULT_COROUTINE; // 初始化调度器的最大协程数量 S->running = -1; S->co = malloc(sizeof(struct coroutine *) * S->cap); // 为调度器中的协程分配存储空间 memset(S->co, 0, sizeof(struct coroutine *) * S->cap); return S; }
这个函数主要是分配一个调度器实体, 并进行初始化.
创建完 Schedule
后, 调用 test()
函数, 在 test()
函数中首先调用 coroutine_new
函数创建了两个协程实例, 下面我们看一下 coroutine_new
做了哪些事情:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// 创建协程任务、并将其加入调度器中 int coroutine_new(struct schedule *S, coroutine_func func, void *ud) { // 创建协程任务(分配内存空间)并初始化 struct coroutine *co = _co_new(S, func , ud); // 将协程任务co加入调度器S,并返回该协程任务的id if (S->nco >= S->cap) { // 调整调度器S中协程的最大容量,然后将协程任务co加入调度器S,并返回该协程任务的id int id = S->cap; S->co = realloc(S->co, S->cap * 2 * sizeof(struct coroutine *)); memset(S->co + S->cap , 0 , sizeof(struct coroutine *) * S->cap); S->co[S->cap] = co; S->cap *= 2; ++S->nco; return id; } else { // 将协程任务co加入调度器S,并返回该协程任务的id int i; for (i=0;i<S->cap;i++) { int id = (i+S->nco) % S->cap; //注意这里的循环顺序 if (S->co[id] == NULL) { //从当前nco开始,找到第一个为NULL的地方 S->co[id] = co; ++S->nco; return id; } } } assert(0); return -1; }
这个函数主要做了两件事:
- 首先创建一个协程任务(分配内存空间)并初始化 [调用
_co_new
函数] - 其次将该协程结构体加入到
shedule.co
数组中
其中_co_new
函数如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 创建协程任务(分配内存空间)并初始化 struct coroutine * _co_new(struct schedule *S , coroutine_func func, void *ud) { struct coroutine * co = malloc(sizeof(*co)); co->func = func; // 初始化协程函数 co->ud = ud; // 初始化用户数据 co->sch = S; // 初始化协程所属的调度器 co->cap = 0; // 初始化协程栈的最大容量 co->size = 0; // 初始化协程栈的当前容量 co->status = COROUTINE_READY; // 初始化协程状态 co->stack = NULL; // 初始化协程栈 return co; }
可以看到每一个初始化的协程任务结构体, 其状态为 COROUTINE_READY
(就绪). 且其私有Stack设置为空.
下面继续回到 test()
函数 , 当我们创建了两个协程任务后, 开始进行循环, 调用 int coroutine_status(struct schedule * S, int id)
从 S
中获得指定 id
对应的协程实例的状态, 如果两个协程任务的状态都不为 COROUTINE_DEAD
, 则可以进行调度.
void coroutine_resume(struct schedule * S, int id)
函数用于恢复协程号为id的协程任务, 具体实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 恢复协程号为id的协程任务 void coroutine_resume(struct schedule * S, int id) { assert(S->running == -1); assert(id >=0 && id < S->cap); struct coroutine *C = S->co[id]; //获取当前id对应的协程结构体 if (C == NULL) return; int status = C->status; //获取statue switch(status) { case COROUTINE_READY: //READY就绪可被调度 getcontext(&C->ctx); // 获取程序当前上下文 C->ctx.uc_stack.ss_sp = S->stack; // 设置上下文C->ctx的栈 C->ctx.uc_stack.ss_size = STACK_SIZE; // 设置上下文C->ctx的栈容量 C->ctx.uc_link = &S->main; // 设置上下文C->ctx执行完后恢复到S->main上下文, 否则当前线程因没有上下文可执行而退出 S->running = id; C->status = COROUTINE_RUNNING; uintptr_t ptr = (uintptr_t)S; makecontext(&C->ctx, (void (*)(void)) mainfunc, 2, (uint32_t)ptr, (uint32_t)(ptr>>32)); // 修改上下文C->ctx, 新的上下文中执行函数mainfunc swapcontext(&S->main, &C->ctx); // 保持当前上下文到S->main, 切换当前上下文为C->ctx break; case COROUTINE_SUSPEND: memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size); // 拷贝协程栈C->stack到S->stack S->running = id; // 设置当前运行的协程id C->status = COROUTINE_RUNNING; // 修改协程C的状态 swapcontext(&S->main, &C->ctx); // 保存当前上下文到S->main, 切换当前上下文为C->ctx break; default: assert(0); } }
如果你的状态是 READY
说明是第一次被调度, 首先会调用getcontext
函数获取当前的上下文, 然后会将该协程设置上下文对应的栈, 设置为 S->stack
, 接着调用makecontext
函数, 将 C->ctx
上下文与 mainfunc
函数进行绑定, 并把指向S
的指针作为参数传递给mainfunc(void*)
.
下面调用 swapcontext(S->main,C->ctx)
, 将当前的上下文保存在 S->main
中, 并切换到 C->ctx
中, 也就是 mainfunc
函数 , 下面我们看一下这个函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14// 所有新协程第一次执行时的入口函数(其中执行协程,并处理善后工作等) static void mainfunc(uint32_t low32, uint32_t hi32) { uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32); struct schedule *S = (struct schedule *)ptr; int id = S->running; struct coroutine *C = S->co[id]; C->func(S,C->ud); //调用func _co_delete(C); //执行完毕后删除C协程 S->co[id] = NULL; --S->nco; S->running = -1; }
这个函数是一个过渡函数, 在这个函数中调用真正的工作函数 C->func
, 执行完毕后删除协程. 我们绑定的执行函数为 foo()
函数, foo函数就是打印一个数, 然后调用 coroutine_yield
函数,中断当前协程的执行.
1
2
3
4
5
6
7
8
9
10
11
12
13// 保存上下文后中断当前协程的执行,然后由调度器中的main上下文接管程序执行权 void coroutine_yield(struct schedule * S) { int id = S->running; assert(id >= 0); struct coroutine * C = S->co[id]; assert((char *)&C > S->stack); _save_stack(C,S->stack + STACK_SIZE); // 保存协程栈 C->status = COROUTINE_SUSPEND; // 修改协程状态 S->running = -1; // 修改当前执行的协程id为-1 swapcontext(&C->ctx , &S->main); // 保存当前协程的上下文到C->ctx, 切换当前上下文到S->main }
在这个函数中, 最重要的就是 _save_stack
保存协程栈, 参数 top
为 S->stack+STACK_SIZE
也就是栈底.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15static void _save_stack(struct coroutine *C, char *top) { char dummy = 0; assert(top - &dummy <= STACK_SIZE); if (C->cap < top - &dummy) { // 为协程栈分配内存空间 free(C->stack); C->cap = top-&dummy; C->stack = malloc(C->cap); } C->size = top - &dummy; memcpy(C->stack, &dummy, C->size); // TODO - 不是很明白为什么这种方式可以保存协程栈 }
这里的dummy是一个占位符, 用于获取当前栈使用的地址, top - &dummy
表示剩余空间大小, 因为存不下了. 重新调整栈的大小. 然后调用 memcpy
函数将 C栈的内容, 拷贝到起始位置. 无论是重新分配了, 还是没有都没有关系的.
执行完 _save_stack
后, 将该协程状态设置为 SUSPEND
暂停状态, 让切换为 S->main
上下文, 此时又回到了 coroutine_resume
函数中, 执行结束后, 会第二次调用 coroutine_resume
函数, 创建第二个协程, 与上面一样, 当打印出第一个字符串后, 退回 到corountine_resume
中, 然后继续 where 循环, 但此时在进入 corountine_resume
函数时, 协程的状态已经变成了SUSPEND
, 进入第二个switch语句, 将c->stack
拷贝到 S->stack
中(拷贝在后面). 然后交换上下文, 继续执行协程.
最后
以上就是敏感皮皮虾最近收集整理的关于coroutine源码分析的全部内容,更多相关coroutine源码分析内容请搜索靠谱客的其他文章。
发表评论 取消回复