我是靠谱客的博主 饱满抽屉,这篇文章主要介绍「从零单排canal 04」 启动模块deployer源码解析1.入口类CanalLauncher2.启动类CanalStarter3.CanalController4.admin的配置监控原理5.总结,现在分享给大家,希望可以做个参考。

基于1.1.5-alpha版本,具体源码笔记可以参考我的github:https://github.com/saigu/JavaKnowledgeGraph/tree/master/code_reading/canal

本文将对canal的启动模块deployer进行分析。

Deployer模块(绿色部分)在整个系统中的角色如下图所示,用来启动canal-server.

在这里插入图片描述
模块内的类如下:

在这里插入图片描述
为了能带着目的看源码,以几个问题开头,带着问题来一起探索deployer模块的源码。

  • CanalServer启动过程中配置如何加载?
  • CanalServer启动过程中涉及哪些组件?
  • 集群模式的canalServer,是如何实现instance的HA呢?
  • 每个canalServer又是怎么获取admin上的配置变更呢?

1.入口类CanalLauncher

这个类是整个canal-server的入口类。负责配置加载和启动canal-server。

主流程如下:

  • 加载canal.properties的配置内容
  • 根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了
    • 如果是admin控制,使用PlainCanalConfigClient获取远程配置
    • 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法)
    • 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server
  • 核心是用canalStarter.start()启动
  • 使用CountDownLatch保持主线程存活
  • 收到关闭信号,CDL-1,然后关闭配置更新线程池,优雅退出
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
public static void main(String[] args) { try { //note:设置全局未捕获异常的处理 setGlobalUncaughtExceptionHandler(); /** * note: * 1.读取canal.properties的配置 * 可以手动指定配置路径名称 */ String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properties = new Properties(); if (conf.startsWith(CLASSPATH_URL_PREFIX)) { conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX); properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf)); } else { properties.load(new FileInputStream(conf)); } final CanalStarter canalStater = new CanalStarter(properties); String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER); /** * note: * 2.根据canal.admin.manager是否为空判断是否是admin控制,如果不是admin控制,就直接根据canal.properties的配置来了 */ if (StringUtils.isNotEmpty(managerAddress)) { String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); //省略一部分。。。。。。 /** * note: * 2.1使用PlainCanalConfigClient获取远程配置 */ final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress, user, passwd, registerIp, Integer.parseInt(adminPort), autoRegister, autoCluster); PlainCanal canalConfig = configClient.findServer(null); if (canalConfig == null) { throw new IllegalArgumentException("managerAddress:" + managerAddress + " can't not found config for [" + registerIp + ":" + adminPort + "]"); } Properties managerProperties = canalConfig.getProperties(); // merge local managerProperties.putAll(properties); int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties, CanalConstants.CANAL_AUTO_SCAN_INTERVAL, "5")); /** * note: * 2.2 新开一个线程池每隔五秒用http请求去admin上拉配置进行merge(这里依赖了instance模块的相关配置拉取的工具方法) */ executor.scheduleWithFixedDelay(new Runnable() { private PlainCanal lastCanalConfig; public void run() { try { if (lastCanalConfig == null) { lastCanalConfig = configClient.findServer(null); } else { PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5()); /** * note: * 2.3 用md5进行校验,如果canal-server配置有更新,那么就重启canal-server */ if (newCanalConfig != null) { // 远程配置canal.properties修改重新加载整个应用 canalStater.stop(); Properties managerProperties = newCanalConfig.getProperties(); // merge local managerProperties.putAll(properties); canalStater.setProperties(managerProperties); canalStater.start(); lastCanalConfig = newCanalConfig; } } } catch (Throwable e) { //.... } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); canalStater.setProperties(managerProperties); } else { canalStater.setProperties(properties); } canalStater.start(); //note: 这样用CDL处理和while(true)有点类似 runningLatch.await(); executor.shutdownNow(); } catch (Throwable e) { //...... } }

2.启动类CanalStarter

从上面的入口类,我们可以看到canal-server真正的启动逻辑在CanalStarter类的start方法。

这里先对三个对象进行辨析:

  • CanalController:是canalServer真正的启动控制器
  • canalMQStarter:用来启动mqProducer。如果serverMode选择了mq,那么会用canalMQStarter来管理mqProducer,将canalServer抓取到的实时变更用mqProducer直接投递到mq
  • CanalAdminWithNetty:这个不是admin控制台,而是对本server启动一个netty服务,让admin控制台通过请求获取当前server的信息,比如运行状态、正在本server上运行的instance信息等

start方法主要逻辑如下:

  • 根据配置的serverMode,决定使用CanalMQProducer或者canalServerWithNetty
  • 启动CanalController
  • 注册shutdownHook
  • 如果CanalMQProducer不为空,启动canalMQStarter(内部使用CanalMQProducer将消息投递给mq)
  • 启动CanalAdminWithNetty做服务器
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public synchronized void start() throws Throwable { String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); /** * note * 1.如果canal.serverMode不是tcp,加载CanalMQProducer,并且启动CanalMQProducer * 回头可以深入研究下ExtensionLoader类的相关实现 */ if (!"tcp".equalsIgnoreCase(serverMode)) { ExtensionLoader<CanalMQProducer> loader = ExtensionLoader.getExtensionLoader(CanalMQProducer.class); canalMQProducer = loader .getExtension(serverMode.toLowerCase(), CONNECTOR_SPI_DIR, CONNECTOR_STANDBY_SPI_DIR); if (canalMQProducer != null) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(canalMQProducer.getClass().getClassLoader()); canalMQProducer.init(properties); Thread.currentThread().setContextClassLoader(cl); } } //note 如果启动了canalMQProducer,就不使用canalWithNetty(这里的netty是用在哪里的?) if (canalMQProducer != null) { MQProperties mqProperties = canalMQProducer.getMqProperties(); // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); if (mqProperties.isFlatMessage()) { // 设置为raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } } controller = new CanalController(properties); //note 2.启动canalController controller.start(); //note 3.注册了一个shutdownHook,系统退出时执行相关逻辑 shutdownThread = new Thread() { public void run() { try { controller.stop(); //note 主线程退出 CanalLauncher.runningLatch.countDown(); } catch (Throwable e) { } finally { } } }; Runtime.getRuntime().addShutdownHook(shutdownThread); //note 4.启动canalMQStarter,集群版的话,没有预先配置destinations。 if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(destinations); controller.setCanalMQStarter(canalMQStarter); } // start canalAdmin String port = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT); //note 5.根据填写的canalAdmin的ip和port,启动canalAdmin,用netty做服务器 if (canalAdmin == null && StringUtils.isNotEmpty(port)) { String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER); String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); CanalAdminController canalAdmin = new CanalAdminController(this); canalAdmin.setUser(user); canalAdmin.setPasswd(passwd); String ip = CanalController.getProperty(properties, CanalConstants.CANAL_IP); CanalAdminWithNetty canalAdminWithNetty = CanalAdminWithNetty.instance(); canalAdminWithNetty.setCanalAdmin(canalAdmin); canalAdminWithNetty.setPort(Integer.parseInt(port)); canalAdminWithNetty.setIp(ip); canalAdminWithNetty.start(); this.canalAdmin = canalAdminWithNetty; } running = true; }

3.CanalController

前面两个类都是比较清晰的,一个是入口类,一个是启动类,下面来看看核心逻辑所在的CanalController。

这里用了大量的匿名内部类实现接口,看起来有点头大,耐心慢慢剖析一下。

3.1 从构造器开始了解

整体初始化的顺序如下:

  • 构建PlainCanalConfigClient,用于用户远程配置的获取
  • 初始化全局配置,顺便把instance相关的全局配置初始化一下
  • 准备一下canal-server,核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq是不需要这个netty的)
  • 初始化zkClient
  • 初始化ServerRunningMonitors,作为instance 运行节点控制
  • 初始化InstanceAction,完成monitor机制。(监控instance配置变化然后调用ServerRunningMonitor进行处理)

这里有几个机制要详细介绍一下。

3.1.1 CanalServer两种模式

canalServer支持两种模式,CanalServerWithEmbedded和CanalServerWithNetty。

在构造器中初始化代码部分如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 3.准备canal server //note: 核心在于embededCanalServer,如果有需要canalServerWithNetty,那就多包装一个(我们serverMode=mq // 是不需要这个netty的) ip = getProperty(properties, CanalConstants.CANAL_IP); //省略一部分。。。 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); //省略一部分。。。 String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY); if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) { canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port); }

embededCanalServer:类型为CanalServerWithEmbedded

canalServer:类型为CanalServerWithNetty

二者有什么区别呢?

都实现了CanalServer接口,且都实现了单例模式,通过静态方法instance获取实例。

关于这两种类型的实现,canal官方文档有以下描述:
在这里插入图片描述
说白了,就是我们可以不必独立部署canal server。在应用直接使用CanalServerWithEmbedded直连mysql数据库进行订阅。

如果觉得自己的技术hold不住相关代码,就独立部署一个canal server,使用canal提供的客户端,连接canal server获取binlog解析后数据。而CanalServerWithNetty是在CanalServerWithEmbedded的基础上做的一层封装,用于与客户端通信。

在独立部署canal server时,Canal客户端发送的所有请求都交给CanalServerWithNetty处理解析,解析完成之后委派给了交给CanalServerWithEmbedded进行处理。因此CanalServerWithNetty就是一个马甲而已。CanalServerWithEmbedded才是核心。

因此,在构造器中,我们看到,用于生成CanalInstance实例的instanceGenerator被设置到了CanalServerWithEmbedded中,

而ip和port被设置到CanalServerWithNetty中。

关于CanalServerWithNetty如何将客户端的请求委派给CanalServerWithEmbedded进行处理,我们将在server模块源码分析中进行讲解。

3.1.2 ServerRunningMonitor

在CanalController的构造器中,canal会为每一个destination创建一个Instance,每个Instance都会由一个ServerRunningMonitor来进行控制。而ServerRunningMonitor统一由ServerRunningMonitors进行管理。

ServerRunningMonitor是做什么的呢?

我们看下它的属性就了解了。它主要用来记录每个instance的运行状态数据的。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/** * 针对server的running节点控制 */ public class ServerRunningMonitor extends AbstractCanalLifeCycle { private static final Logger logger = LoggerFactory.getLogger(ServerRunningMonitor.class); private ZkClientx zkClient; private String destination; private IZkDataListener dataListener; private BooleanMutex mutex = new BooleanMutex(false); private volatile boolean release = false; // 当前服务节点状态信息 private ServerRunningData serverData; // 当前实际运行的节点状态信息 private volatile ServerRunningData activeData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5; private ServerRunningListener listener; public ServerRunningMonitor(ServerRunningData serverData){ this(); this.serverData = serverData; } //。。。。。 }

在创建ServerRunningMonitor对象时,首先根据ServerRunningData创建ServerRunningMonitor实例,之后设置了destination和ServerRunningListener。

ServerRunningListener是个接口,这里采用了匿名内部类的形式构建,实现了各个接口的方法。

主要为instance在当前server上的状态发生变化时调用。比如要在当前server上启动这个instance了,就调用相关启动方法,如果在这个server上关闭instance,就调用相关关闭方法。

具体的调用逻辑我们后面在启动过程中分析,这里大概知道下构造器中做了些什么就行了,主要就是一些启动、关闭的逻辑。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
new Function<String, ServerRunningMonitor>() { public ServerRunningMonitor apply(final String destination) { ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData); runningMonitor.setDestination(destination); runningMonitor.setListener(new ServerRunningListener() { /** * note * 1.内部调用了embededCanalServer的start(destination)方法。 * 这里很关键,说明每个destination对应的CanalInstance是通过embededCanalServer的start方法启动的, * 这样我们就能理解,为什么之前构造器中会把instanceGenerator设置到embededCanalServer中了。 * embededCanalServer负责调用instanceGenerator生成CanalInstance实例,并负责其启动。 * * 2.如果投递mq,还会直接调用canalMQStarter来启动一个destination */ public void processActiveEnter() { //省略具体内容。。。 } /** * note * 1.与开始顺序相反,如果有mqStarter,先停止mqStarter的destination * 2.停止embedeCanalServer的destination */ public void processActiveExit() { //省略具体内容。。。 } /** * note * 在Canalinstance启动之前,destination注册到ZK上,创建节点 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。 * 此方法会在processActiveEnter()之前被调用 */ public void processStart() { //省略具体内容。。。 } /** * note * 在Canalinstance停止前,把ZK上节点删除掉 * 路径为:/otter/canal/destinations/{0}/cluster/{1},其0会被destination替换,1会被ip:port替换。 * 此方法会在processActiveExit()之前被调用 */ public void processStop() { //省略具体内容。。。 } }); if (zkclientx != null) { runningMonitor.setZkClient(zkclientx); } // 触发创建一下cid节点 runningMonitor.init(); return runningMonitor; } }

3.2 canalController的start方法

具体运行逻辑如下:

  • 在zk的/otter/canal/cluster目录下根据ip:port创建server的临时节点,注册zk监听器
  • 先启动embededCanalServer(会启动对应的监控)
  • 根据配置的instance的destination,调用runningMonitor.start() 逐个启动instance
  • 如果cannalServer不为空,启动canServer (canalServerWithNetty)

这里需要注意,canalServer什么时候为空?

如果用户选择了serverMode为mq,那么就不会启动canalServerWithNetty,采用mqStarter来作为server,直接跟mq集群交互。canalServerWithNetty只有在serverMode为tcp时才启动,用来跟canal-client做交互。

所以如果以后想把embeddedCanal嵌入自己的应用,可以考虑参考mqStarter的写法。后面我们在server模块中会做详细解析。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public void start() throws Throwable { // 创建整个canal的工作节点 final String path = ZookeeperPathUtils.getCanalClusterNode(registerIp + ":" + port); initCid(path); if (zkclientx != null) { this.zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception{ logger.error("failed to connect to zookeeper", error); } }); } // 先启动embeded服务 embededCanalServer.start(); // 尝试启动一下非lazy状态的通道 for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) { final String destination = entry.getKey(); InstanceConfig config = entry.getValue(); // 创建destination的工作节点 if (!embededCanalServer.isStart(destination)) { // HA机制启动 ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } } //note:为每个instance注册一个配置监视器 if (autoScan) { instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction); } } if (autoScan) { //note:启动线程定时去扫描配置 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } } // 启动网络接口 if (canalServer != null) { canalServer.start(); } }

我们重点关注启动instance的过程,也就是ServerRunningMonitor的运行机制,也就是HA启动的关键。

入口在runningMonitor.start()。

  • 如果zkClient != null,就用zk进行HA启动
  • 否则,就直接processActiveEnter启动,这个我们前面已经分析过了
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public synchronized void start() { super.start(); try { /** * note * 内部会调用ServerRunningListener的processStart()方法 */ processStart(); if (zkClient != null) { // 如果需要尽可能释放instance资源,不需要监听running节点,不然即使stop了这台机器,另一台机器立马会start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { /** * note * 内部直接调用ServerRunningListener的processActiveEnter()方法 */ processActiveEnter();// 没有zk,直接启动 } } catch (Exception e) { logger.error("start failed", e); // 没有正常启动,重置一下状态,避免干扰下一次start stop(); } }

重点关注下HA启动方式,一般 我们都采用这种模式进行。

在集群模式下,可能会有多个canal server共同处理同一个destination,

在某一时刻,只能由一个canal server进行处理,处理这个destination的canal server进入running状态,其他canal server进入standby状态。

同时,通过监听对应的path节点,一旦发生变化,出现异常,可以立刻尝试自己进入running,保证了instace的 高可用!!

启动的重点还是在initRuning()。

利用zk来保证集群中有且只有 一个instance任务在运行。

  • 还构建一个临时节点的路径:/otter/canal/destinations/{0}/running
  • 尝试创建临时节点。
  • 如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。此时会抛出ZkNodeExistsException,进入catch代码块。
  • 如果创建成功,就说明没有其他server启动这个instance,可以创建
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private void initRunning() { if (!isStart()) { return; } //note: 还是一样构建一个临时节点的路径:/otter/canal/destinations/{0}/running String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); /** * note: * 尝试创建临时节点。如果节点已经存在,说明是其他的canal server已经启动了这个canal instance。 * 此时会抛出ZkNodeExistsException,进入catch代码块。 */ zkClient.create(path, bytes, CreateMode.EPHEMERAL); /** * note: * 如果创建成功,就开始触发启动事件 */ activeData = serverData; processActiveEnter();// 触发一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { /** * note: * 如果捕获异常,表示创建失败。 * 就根据临时节点路径查一下是哪个canal-sever创建了。 * 如果没有相关信息,马上重新尝试一下。 * 如果确实存在,就把相关信息保存下来 */ bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在节点,立即尝试一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { /** * note: * 如果是父节点不存在,那么就尝试创建一下父节点,然后再初始化。 */ zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 尝试创建父节点 initRunning(); } }

那运行中的HA是如何实现的呢,我们回头看一下

复制代码
1
2
zkClient.subscribeDataChanges(path, dataListener);

对destination对应的running节点进行监听,一旦发生了变化,则说明可能其他处理相同destination的canal server可能出现了异常,此时需要尝试自己进入running状态。

dataListener是在ServerRunningMonitor的构造方法中初始化的,

包括节点发生变化、节点被删两种变化情况以及相对应的处理逻辑,如下 :

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public ServerRunningMonitor(){ // 创建父节点 dataListener = new IZkDataListener() { /** * note: * 当注册节点发生变化时,会自动回调这个方法。 * 我们回想一下使用过程中,什么时候可能 改变节点当状态呢? * 大概是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive。 * 可以 触发 HA。 */ public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 说明出现了主动释放的操作,并且本机之前是active releaseRunning();// 彻底释放mainstem } activeData = (ServerRunningData) runningData; } /** * note: * 如果其他canal instance出现异常,临时节点数据被删除时,会自动回调这个方法,此时当前canal instance要顶上去 */ public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的状态就是本机,则即时触发一下active抢占 initRunning(); } else { // 否则就是等待delayTime,避免因网络异常或者zk异常,导致出现频繁的切换操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; }

当注册节点发生变化时,会自动回调zkListener的handleDataChange方法。

我们回想一下使用过程中,什么时候可能 改变节点当状态呢?

就是在控制台中,对canal-server中正在运行的 instance做"停止"操作时,改变了isActive,可以 触发 HA。

如下图所示
在这里插入图片描述

4.admin的配置监控原理

我们现在采用admin做全局的配置控制。

那么每个canalServer是怎么监控配置的变化呢?

还记得上吗cananlController的start方法中对配置监视器的启动吗?

复制代码
1
2
3
4
5
6
7
8
9
10
11
if (autoScan) { //note:启动线程定时去扫描配置 instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); //note:这部分代码似乎没有用,目前只能是manager或者spring两种方式二选一 for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } }

这个就是关键的配置监控。

我们来看deployer模块中的monitor包了。
在这里插入图片描述

4.1 InstanceAction

是一个接口,有四个方法,用来获取配置后,对具体instance采取动作。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** * config配置变化后的动作 * * @author jianghang 2013-2-18 下午01:19:29 * @version 1.0.1 */ public interface InstanceAction { /** * 启动destination */ void start(String destination); /** * 主动释放destination运行 */ void release(String destination); /** * 停止destination */ void stop(String destination); /** * 重载destination,可能需要stop,start操作,或者只是更新下内存配置 */ void reload(String destination); }

具体实现在canalController的构造器中实现了匿名类。

4.2 InstanceConfigMonitor

这个接口有两个实现,一个是基于spring的,一个基于manager(就是admin)。

我们看下基于manager配置的实现的ManagerInstanceConfigMonitor即可。

原理很简单。

采用一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置
然后通过defaultAction去start
这个start在canalController的构造器的匿名类中实现,会使用instance对应的runningMonitor做HA启动。具体逻辑上一小节已经详细介绍过了。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/** * 基于manager配置的实现 * * @author agapple 2019年8月26日 下午10:00:20 * @since 1.1.4 */ public class ManagerInstanceConfigMonitor extends AbstractCanalLifeCycle implements InstanceConfigMonitor, CanalLifeCycle { private static final Logger logger = LoggerFactory.getLogger(ManagerInstanceConfigMonitor.class); private long scanIntervalInSecond = 5; private InstanceAction defaultAction = null; /** * note: * 每个instance对应的instanceAction,实际上我们看代码发现都是用的同一个defaultAction */ private Map<String, InstanceAction> actions = new MapMaker().makeMap(); /** * note: * 每个instance对应的远程配置 */ private Map<String, PlainCanal> configs = MigrateMap.makeComputingMap(new Function<String, PlainCanal>() { public PlainCanal apply(String destination) { return new PlainCanal(); } }); /** * note: * 一个固定大小线程池,每隔5s,使用PlainCanalConfigClient去拉取instance配置 */ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("canal-instance-scan")); private volatile boolean isFirst = true; /** * note: * 拉取admin配置的client */ private PlainCanalConfigClient configClient; //… }

5.总结

deployer模块的主要作用:

1)读取canal.properties,确定canal instance的配置加载方式。如果使用了admin,那么还会定时拉取admin上的配置更新。

2)确定canal-server的启动方式:独立启动或者集群方式启动

3)利用zkClient监听canal instance在zookeeper上的状态变化,动态停止、启动或新增,实现了instance的HA

4)利用InstanceConfigMonitor,采用固定线程定时轮训admin,获取instance的最新配置

5)启动canal server,监听客户端请求

这里还有个非常有意思的问题没有展开说明,那就是CanalStarter里面的配置加载,通过ExtensionLoader类的相关实现,如何通过不同的类加载器,实现SPI,后面再分析吧。

看到这里了,原创不易,点个赞吧,你最好看了~

知识碎片重新梳理,构建Java知识图谱:https://github.com/saigu/JavaKnowledgeGraph (历史文章查阅非常方便)

最后

以上就是饱满抽屉最近收集整理的关于「从零单排canal 04」 启动模块deployer源码解析1.入口类CanalLauncher2.启动类CanalStarter3.CanalController4.admin的配置监控原理5.总结的全部内容,更多相关「从零单排canal内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(113)

评论列表共有 0 条评论

立即
投稿
返回
顶部