我是靠谱客的博主 生动自行车,这篇文章主要介绍etcd网络层源码分析(三)——Peer接口实现,现在分享给大家,希望可以做个参考。

通过上一节分析我们知道,rafthttp.Transporter接口实现中多个方法是通过调用Peer实例的相应方法实现的,个人理解为rafthttp.Transporter为网络传输的抽象。在分布式系统中的通信,其实就是节点之间的通信,以此细分出Peer节点的抽象。

结构体

复制代码
1
2
3
4
5
6
7
8
9
10
11
type Peer interface { //发送单个消息,该方法是非阻塞的,如果出现发送失败,则会将失败信息报告给底层的Raft接口 send(m raftpb.Message) sendSnap(m snap.Message)//发送snap.Message,其他行为与上面的send()方法类似 update(urls types.URLs)//更新对应节点暴露的URL地址 //将指定的连接与当前的Peer绑定,Peer会将该连接作为Stream消息通道使用 //当Peer不再使用该连接时,会将该连接关闭 attachOutgoingConn(conn *outgoingConn) stop()//关闭当前Peer实例,会关闭底层的网络连接 }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type peer struct { localID types.ID//本地当前的节点ID id types.ID//该peer实例对应的节点ID r Raft //Raft接口,在Raft接口实现的底层封装了etcd-raft模块。 status *peerStatus //节点的状态 picker *urlPicker//每个节点可能提供了多个URL供其他节点访问,当其中一个访问失败时,我们应该可以尝试访问另一个。而urlPicker提供的主要功能就是这些URL之间进行切换。 msgAppV2Writer *streamWriter //v2版本负责向Stream消息通道写入消息 writer *streamWriter //v3版本负责向Stream消息通道写入消息 pipeline *pipeline //pipeline消息通道 snapSender *snapshotSender //负责发送快照数据 msgAppV2Reader *streamReader//v2版本负责从Stream消息通道读取消息 msgAppReader *streamReader//v3版本负责从Stream消息通道读取消息 recvc chan raftpb.Message //从Stream消息通道中读取消息之后,会通过该通道将消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理。 propc chan raftpb.Message//从Stream消息通道中读取到MsgProp类型的消息之后,会通过该通道将MsgProp消息交给Raft接口,然后由它返回给底层etcd-raft模块进行处理。 paused bool//是否暂停向对应的节点发送消息 }

peer启动

在上一节中有提到过在Transport的方法AddPeer会启动startPeer

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer { ... status := newPeerStatus(t.Logger, t.ID, peerID)//初始化peer状态 picker := newURLPicker(urls)//根据节点的URL创建urlPicker errorc := t.ErrorC r := t.Raft //底层的Raft状态机 pipeline := &pipeline{ //创建pipeline实例 peerID: peerID, tr: t, picker: picker, status: status, followerStats: fs, raft: r, errorc: errorc, } pipeline.start()//启动pipeline p := &peer{//创建Peer实例 lg: t.Logger, localID: t.ID, id: peerID, r: r, status: status, picker: picker, msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),//创建并启动v2版本的streamWriter writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),//创建并启动v3版本的streamWriter pipeline: pipeline, snapSender: newSnapshotSender(t, picker, peerID, status), recvc: make(chan raftpb.Message, recvBufSize),//创建recvc通道,注意缓冲区大小 propc: make(chan raftpb.Message, maxPendingProposals),//创建propc通道,注意缓冲区大小 stopc: make(chan struct{}), } ctx, cancel := context.WithCancel(context.Background()) p.cancel = cancel /* 启动单独的goroutine,它主要负责将recvc通道中读取消息,该通道中的消息就是从对端节点发送过来 的消息,然后将读取到的消息交给底层的Raft状态机进行处理。 */ go func() { for { select { case mm := <-p.recvc://从recvc通道中获取连接上读取到的Message //将Message交给底层Raft状态机处理 if err := r.Process(ctx, mm); err != nil { if t.Logger != nil { t.Logger.Warn("failed to process Raft message", zap.Error(err)) } } case <-p.stopc: return } } }() /* 底层Raft状态机处理MsgProp类型的Message时,可能会阻塞,所以启动单独的goroutine来处理 */ go func() { for { select { case mm := <-p.propc: if err := r.Process(ctx, mm); err != nil {//从propc通道中获取MsgProp类型的Message //将Message交给底层Raft状态机处理 if t.Logger != nil { t.Logger.Warn("failed to process Raft message", zap.Error(err)) } } case <-p.stopc: return } } }() //创建并启动v2版本的streamReader实例,它主要负责从Stream消息通道上读取消息 p.msgAppV2Reader = &streamReader{ lg: t.Logger, peerID: peerID, typ: streamTypeMsgAppV2, tr: t, picker: picker, status: status, recvc: p.recvc, propc: p.propc, rl: rate.NewLimiter(t.DialRetryFrequency, 1), } //创建并启动v3版本的streamReader实例,它主要负责从Stream消息通道上读取消息 p.msgAppReader = &streamReader{ lg: t.Logger, peerID: peerID, typ: streamTypeMessage, tr: t, picker: picker, status: status, recvc: p.recvc, propc: p.propc, rl: rate.NewLimiter(t.DialRetryFrequency, 1), } p.msgAppV2Reader.start() p.msgAppReader.start() ... }

rafthttp.peer.startPeer()方法在启动的时候会启动pipeline和stream的两种消息通道类型,pipeline是短连接的,stream则是HTTP长连接,所以stream需要专门负责写入数据的streamWriter和专门读取数据的streamReader。
最终pipeline和stream接收到的消息都会被写入到peer.recvc和peer.propc再由etcd-raft模块统一处理。

send发送消息

上一节中Transport的方法send就是通过调用rafthttp.peer.send方法来实现消息发送功能的。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (p *peer) send(m raftpb.Message) { p.mu.Lock() paused := p.paused//检测paused字段,是否暂停对指定节点发送消息 p.mu.Unlock() if paused { return } //根据消息的类型选择合适的消息通道 writec, name := p.pick(m) select { case writec <- m://根据消息的类型将Message写入writc通道中,等待发送 default: //如果发送出现阻塞,则将信息报告给底层raft状态机, p.r.ReportUnreachable(m.To) if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } } }

在发送消息的时候,会根据消息的类型选择合适的消息通道,并返回相应的通道供rafthttp.peer.send()方法写入待发送的消息,这个选择通道的方法是由peer.pick()方法来完成的。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { var ok bool //如果是MsgSnap类型的消息,则返回Pipeline消息通道对应的Channel,否则返回Stream消息通道 if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg } return p.pipeline.msgc, pipelineMsg }
  1. 如果该消息是MsgSnap快照消息,则选择pipeline.msgc通道
  2. 如果消息类型是MsgApp并且v2版本的streamWriter处于工作状态,则选择v2版本的streamWriter.msgc通道
  3. 如果v3版本的streamWriter处于工作状态,则选择v3版本的streamWriter.msgc通道
  4. 最后,如果都不符合则直接使用pipeline.msgc通道

attachOutgoingConn获得连接

上一节中Transport的方法Handler提到过streamHandler会调用attachOutgoingConn的方法。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { case streamTypeMsgAppV2: ok = p.msgAppV2Writer.attach(conn)//v2版本streamWriter绑定连接 case streamTypeMessage: ok = p.writer.attach(conn)//v3版本streamWriter绑定连接 default: if p.lg != nil { p.lg.Panic("unknown stream type", zap.String("type", conn.t.String())) } } if !ok { conn.Close() } }

前面提到stream消息通道类型是HTTP长连接的,rafthttp.peer.attachOutgoingConn作用就是独自对网络连接做了一层封装,供streamWriter进行绑定,绑定之后http处理器就可以将缓存区的数据刷到对端,这个方法是为stream消息通道实现的。

更多欢迎关注go成神之路

最后

以上就是生动自行车最近收集整理的关于etcd网络层源码分析(三)——Peer接口实现的全部内容,更多相关etcd网络层源码分析(三)——Peer接口实现内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(83)

评论列表共有 0 条评论

立即
投稿
返回
顶部