sylar库学习线程与协程
知识点
thread_local关键字,这个关键字在我理解看来是一个类似于static的关键字,但区别在于这个关键字所描述的数据的划分是线程,在单个线程内它与static没有区别,而不同的线程中thread_local关键字是独立的每个线程都有一份单独的,互不干扰的数据。因此这一关键字是构造多线程调度的关键,在线程类,协程类,调度器类都需要用到。
其中线程和线程执行其他函数之前,都是用一个函数将其他函数包装起来,以此来设置函数执行之前的参数
线程
由于使用的是Linux作为操作系统,要将程序中所得到的线程号与系统中使用ps命令查出来的编号相对应,最好使用linux系统中自带的库。也就是pthread
线程类主要是包装了
pthread_create(thread)
pthread_detach(~thread)
pthread_join(join)
pthread_setname_np(run)
这几个函数,这些都是C语言风格的函数,以pthread_t数据结构来进行函数的控制。
其中为了构造函数之前
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26Thread::Thread(std::function<void()> cb, const std::string &name):m_cb(cb),m_name(name) { if (name.empty()) { m_name ="UNKNOW"; } int rt = pthread_create(&m_thread, nullptr, &Thread::run, this); //线程创建,此处开始就立即执行 // 线程可能在构造函数返回之前就跑起来 if (rt) { SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt << "name= " << name; throw std::logic_error("pthread_create error");//逻辑错误由于程序内部逻辑而导致的错误 } m_semaphore.wait();//可以等到跑起来在唤醒 } void *Thread::run(void *arg) {//线程函数参数必须是void* POSIX线程库标准,run为静态成员,this指针必须显式传入 Thread* thread = (Thread *)arg; t_thread = thread; t_thread_name = thread->m_name; thread->m_id = sylar::GetThreadId(); pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());//为线程设置唯一名称 std::function<void()> cb; cb.swap(thread->m_cb); thread->m_semaphore.notify();//线程执行到这的时候析构函数才能结束 cb(); return 0; }
通过加了一个锁保证了执行顺序的问题,构造函数一直在等待,知道设置完函数,设置完名字,开始运行函数的时候才能析构完成
callback一下,其中的线程指针和线程名就是使用的thread_local来进行区分的。
协程
相比于线程只是运行一个函数来说,协程就是一个讲究人啊。其中设定了状态(设计模式中就有一个状态模式,咱这也是使用了设计模式的人上人了)。
协程的包装主要是控制的ucontext_t数据结构
其中
需要有三个参数需要设置
uc_link//后续上下文,一般置空指针
uc_stack.ss_sp//栈空间地址->对应malloc申请内存大小
uc_stack.ss_size//栈空间大小
包装有
getcontext //初始化,设置值(Fiber)
makecontext//类似于执行函数(Fiber或reset)
swapcontext//上下文切换,非常重要的函数。协程通过这个函数进行切换
其中总体结构为,设置一个主协程,在使用的时候通过主协程来进行控制
协程 <——>主协程<——>协程
每个线程中需要一个主协程在中间进行调度。
因此 thread_local关键字又开始派上用场了,
1
2
3static thread_local Fiber *t_fiber = nullptr;//当前执行线程 static thread_local Fiber::ptr t_threadFiber = nullptr;//主线程
那么就有一个问题,创建协程的时候一定需要有一个主协程,因此可以用写一个GetThis函数,每次要交换之前获取一下,有则直接获取,无则创建一
1
2
3
4
5
6
7
8
9
10Fiber::ptr Fiber::GetThis() { if (t_fiber) {//若当前没有正在运行的协程,因此必无主协程 return t_fiber->shared_from_this();//有就直接返回当前指针weakptr } Fiber::ptr main_fiber(new Fiber);//创建时已经将指针放入了t_fiber! SYLAR_ASSERT(t_fiber == main_fiber.get()) t_threadFiber = main_fiber; return t_fiber->shared_from_this(); }
协程的其他函数都是基本围绕着swapcontext函数展开的主要是
swapin 将主协程切换到当前协程
swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)
swapout 将当前协程切换到主协程
swapcontext(&m_ctx, &t_threadFiber->m_ctx)
swapin 比较好理解切入后状态都变为EXEC(正在执行)
协程切出后需要变成什么状态就更值得商榷。
1
2
3enum State { INIT, HOLD, EXEC, TERM, READY,EXCEPT }; // 初始化 保持 执行 结束 准备 异常
通过状态来判断下次调用时需要做什么工作,和线程不同的是他不swapin代码就不会执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31void Fiber::CallerMainFunc() { Fiber::ptr cur = GetThis();//这里赋值了一次引用计数+1 SYLAR_ASSERT(cur); try { cur->m_cb(); cur->m_cb = nullptr; cur->m_state = TERM; } catch (std::exception& ex) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what() << " fiber_id=" << cur->getId() << std::endl << sylar::BacktraceToString(); } catch (...) { cur->m_state = EXCEPT; SYLAR_LOG_ERROR(g_logger) << "Fiber Except" << " fiber_id=" << cur->getId() << std::endl << sylar::BacktraceToString(); } auto raw_ptr = cur.get();//此处为什么要用指针也很有说法 cur.reset(); raw_ptr->back();//执行此处又回到了主协程,若直接用cur会卡到此处不能释放,去执行主协程了,而由于cb()已经执行完了, //这里应该是需要销毁的状态,若直接使用只能指针会导致引用计数永远有1而不能释放 //永远不会执行到下面这一行,那么栈空间就不会被销毁 SYLAR_ASSERT2(false, "never reach fiber_id=" + std::to_string(raw_ptr->getId())); }
调度管理器
调度管理器,核心自然是为了调度,调度的是什么呢,他应该是任务,或者是要执行的函数,因此要将两者存储成一种形式是有必要的。下面这一个类就是用来描述这种形式的。需要注意的是,这一形式需要可以以不同的形式进行初始化,可以直接使用函数,或者直接使用协程来进行构造,还可以通过reset直接清空掉,或者通过无参函数先占个位置,在与要执行的线程号绑定在一起方便调度。//注意,这是结构体,所有参数都是公共的,因此可以直接赋值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18struct FiberAndThread { Fiber::ptr fiber; std::function<void()> cb; int thread; FiberAndThread(Fiber::ptr f, int thr) : fiber(f), thread(thr) {} FiberAndThread(Fiber::ptr *f, int thr) : thread(thr) { fiber.swap(*f); } FiberAndThread(std::function<void()> f, int thr) : cb(f), thread(thr) {} FiberAndThread(std::function<void()> *f, int thr) : thread(thr) { cb.swap(*f); } FiberAndThread() : thread(-1) {}//方便初始化 void reset() { fiber = nullptr; cb = nullptr; thread = -1; } };
对于调度管理器还有一个核心问题,那就是这个调度管理器应该在哪一个线程上呢?这一个线程没有主协程应该怎么办?又应该在哪一个协程上呢?
线程其实好解决,那个线程构造调度管理器就使用当前线程,因此创建管理器之前,首先要为创建管理器所在的线程上调用Fiber::GetThis()。目的是没有的话就创建一个主协程。那么应该在那一个协程上呢,很显然不会是一个线程的主协程,因为主协程忙着呢还有其他事要干,因此需要在创建一个协程,专职调度 。当然还有更简单粗暴的办法,创建一个线程专门给调度管理器使用!就不需要管这么多了。
1
2
3static thread_local Scheduler *t_scheduler = nullptr; static thread_local Fiber* t_fiber = nullptr;
很明显这表示了正在执行的管理器,和正在执行的协程。由于可能需要管理多个线程,因此没有线程指针也是可以理解的
解决了上述两个问题之后就到了令人头疼的正式调度程序了。在开始之前,怎么能忘记thread_local关键字呢
首先程序有两个可能的入口
1是通过start函数中的线程进入的
2是通过主协程里保存的函数进入的
对函数来说
首先是验证部分
当线程运行此函数时
第一步:遍历其所有的FiberAndThread 结构体并找到当前线程要运行的任务,若任务正在执行(EXEC)则跳过。
第二步:若要运行的协程
则判断其状态
(若协程不是结束TERM或异常EXCEPT 则执行线程(此处状态主要为HOLD,READY,INIT)
执行完后若是READY状态则再放回调度管理器内,要是没执行完就出来了则变为HOLD,(由于HOLD比较特殊,不能再由调度管理器代管,不然会再下一次循环中执行出来,因此将状态转为HOLD后就不再记录可以直接清除)
清除当前fiber
)
若当前需要执行的是函数的话
就创建一个协程,将函数装进去,执行此协程再执行上述步骤
若是空的(一般是经过上面执行完的Fiber会被reset为空)
则进行空闲状态的处理
空闲线程会在(stop函数执行后,且任务列表为空,且没有活跃线程,rootFiber未执行的时候将run函数结束掉
具体函数如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89void Scheduler::run() { SYLAR_LOG_INFO(g_logger)<<m_name<<"run"; setThis(); if(sylar::GetThreadId() != m_rootThread) {//判断是否为主线程 t_fiber=Fiber::GetThis().get(); } Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle,this))); Fiber::ptr cb_fiber; FiberAndThread ft; while (true) { ft.reset(); bool tickle_me = false; bool is_active=false; {//从消息队列中取出一个需要执行的消息 MutexType::Lock lock(m_mutex); auto it = m_fibers.begin(); while (it != m_fibers.end()) {//找到是否是线程id if (it->thread != -1 && it->thread != sylar::GetThreadId()) {//期望的线程id不是当前线程id ++it; tickle_me = true;//通知其他线程 continue; } SYLAR_ASSERT(it->fiber || it->cb); if (it->fiber && it->fiber->getState() == Fiber::EXEC) {//正在执行什么都不做 ++it; continue; } ft = *it; m_fibers.erase(it); ++m_activeThreadCount; is_active=true; break; } // tickle_me|=it!=m_fibers.end(); } if (tickle_me) { tickle(); } if (ft.fiber && (ft.fiber->getState() != Fiber::TERM&&ft.fiber->getState()!=Fiber::EXCEPT)) { ft.fiber->swapIn(); --m_activeThreadCount; if (ft.fiber->getState() == Fiber::READY) { schedule(ft.fiber); } else if (ft.fiber->getState() != Fiber::TERM && ft.fiber->getState() != Fiber::EXCEPT) { ft.fiber->m_state=Fiber::HOLD; } ft.reset(); } else if (ft.cb) { if (cb_fiber) { cb_fiber->reset(ft.cb); } else { cb_fiber.reset(new Fiber(ft.cb)); } ft.reset(); cb_fiber->swapIn(); --m_activeThreadCount; if (cb_fiber->getState() == Fiber::READY) { schedule(cb_fiber); cb_fiber.reset(); } else if (cb_fiber->getState() == Fiber::EXCEPT || cb_fiber->getState() == Fiber::TERM) { cb_fiber->reset(nullptr); } else {//if (cb_fiber->getState() != Fiber::TERM) { cb_fiber->m_state = Fiber::HOLD; cb_fiber.reset(); } } else { if (is_active) { --m_activeThreadCount; continue; } if (idle_fiber->getState() == Fiber::TERM) { SYLAR_LOG_INFO(g_logger)<<"idle fiber term"; break; } ++m_idleThreadCount; idle_fiber->swapIn(); --m_idleThreadCount; if (idle_fiber->getState() != Fiber::TERM && idle_fiber->getState() != Fiber::EXCEPT) {//若回来的状态不等于结束状态时 idle_fiber->m_state = Fiber::HOLD; } } } }
最后
以上就是谨慎小丸子最近收集整理的关于sylar库学习线程、协程、调度管理器sylar库学习线程与协程的全部内容,更多相关sylar库学习线程、协程、调度管理器sylar库学习线程与协程内容请搜索靠谱客的其他文章。
发表评论 取消回复