我是靠谱客的博主 健忘铃铛,这篇文章主要介绍基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示,现在分享给大家,希望可以做个参考。

一、新建三个表

(1)主表user_id。它包含有主键user_id

复制代码
1
2
3
4
5
6
7
CREATE TABLE `user_id` ( `user_id` int(11) NOT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `age` int(11) DEFAULT NULL, `user_info_id` int(11) DEFAULT NULL, PRIMARY KEY (`user_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

(2)从表。

复制代码
1
2
3
4
5
6
7
8
CREATE TABLE `user_info` ( `sex` int(1) DEFAULT NULL, `user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `user_info_id` int(11) NOT NULL, PRIMARY KEY (`user_info_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

(3)合并后的表user

复制代码
1
2
3
4
5
6
7
8
9
10
CREATE TABLE `user` ( `id` int(11) NOT NULL, `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `age` int(11) DEFAULT NULL, `sex` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `user_name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `password` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, `tele` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

二、要实现的功能说明

需要接收kafka推送过来的的增量消息。分别入库对应库的表(表名相同),比如我要从A库的表入库到B库的表,A库的表CRUD一条消息后会推动到kafka,B会从kafka消费这条消息。从而入库到B库对用的表(这就是增量同步消息),并且,需要根据kafka上的消息,确定哪些表需要合并成新的具体的表,我们定义为C,C的字段包含了来源于A的和B的

我们在Test里边写死kafka消息进行测试:假设(1)下边是从kafka消费的binlog日志消息。也就是推送过来的两条消息分别是user_info_id表的和user_id表的。我们现在要把这两条消息入库到我们新的库新的表(表名相同)。并且,合并成一个新的表user。这个合并规则以及kafka上的映射关系请看(2)

(1)op_type为“I”代表新增,“U”代表修改 ,“D”代表删除。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
messageList.add("{n" + "t"after": {n" + "tt"user_info_id": "4",n" + "tt"sex": "1",n" + "tt"user_name": "admin",n" + "tt"tele": "183xxx",n" + "tt"password": "123"n" + "t},n" + "t"before": {},n" + "t"op_ts": "1",n" + "t"op_type": "I",n" + "t"primary_keys": ["user_info_id"],n" + "t"schema": "crm1",n" + "t"table": "user_info"n" + "}"); messageList.add("{n" + "t"after": {n" + "tt"user_id": "2",n" + "tt"name": "rui",n" + "tt"age": "100",n" + "tt"user_info_id": "4"n" + "t},n" + "t"before": {},n" + "t"op_ts": "1",n" + "t"op_type": "I",n" + "t"primary_keys": ["user_id"],n" + "t"schema": "crm1",n" + "t"table": "user_id"n" + "}");

(2)从kafka的这个映射关系可以看到,user_id-->user_id表、user_info-->user_info表、user_id+user_info-->user表

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
ktssubscriberJsonStr = "{n" + " "url": "jdbc:mysql://xx:3306/test?rewriteBatchedStatements=true",n" + " "username": "dataopen",n" + " "password": "dataopen",n" + " "type": "rds",n" + " "kafkaBrokers": "xx:9011",n" + " "kafkaTopic": "testtopic3",n" + " "kafkaPartition": "",n" + " "kafkaOffsetReset": "latest",n" + " "kafkaConsumerGroup": "",n" + " "kafkaOffsetsForTimes": "",n" + " "kafkaUserName": "",n" + " "kafkaPassword": "",n" + " "dataFormatType": "JSON",n" + " "dataFormat": "OGG",n" + " "relation": [n" + " {n" + " "partition": "",n" + " "sourceSchema": "crm1",n" + " "sourceTable": "user_id",n" + " "targetSchema": "test",n" + " "targetTable": "user_id",n" + " "operaType": "I,U,D",n" + " "where": "",n" + " "tableType": "sub",n" + " "column": [n" + " {n" + " "sourceColumn": "user_id",n" + " "columnCode": "user_id",n" + " "columnType": "int",n" + " "columnValue": "user_id"n" + " },n" + " {n" + " "sourceColumn": "name",n" + " "columnCode": "name",n" + " "columnType": "varchar",n" + " "columnValue": "name"n" + " },n" + " {n" + " "sourceColumn": "age",n" + " "columnCode": "age",n" + " "columnType": "int",n" + " "columnValue": "age"n" + " },n" + " {n" + " "sourceColumn": "user_info_id",n" + " "columnCode": "user_info_id",n" + " "columnType": "int",n" + " "columnValue": "user_info_id"n" + " }n" + " ]n" + " },n" + " {n" + " "partition": "",n" + " "sourceSchema": "crm1",n" + " "sourceTable": "user_info",n" + " "targetSchema": "test",n" + " "targetTable": "user_info",n" + " "operaType": "I,U,D",n" + " "where": "",n" + " "tableType": "sub",n" + " "column": [n" + " {n" + " "sourceColumn": "user_info_id",n" + " "columnCode": "user_info_id",n" + " "columnType": "int",n" + " "columnValue": "user_info_id"n" + " },n" + " {n" + " "sourceColumn": "sex",n" + " "columnCode": "sex",n" + " "columnType": "int",n" + " "columnValue": "sex"n" + " },n" + " {n" + " "sourceColumn": "user_name",n" + " "columnCode": "user_name",n" + " "columnType": "varchar",n" + " "columnValue": "user_name"n" + " },n" + " {n" + " "sourceColumn": "password",n" + " "columnCode": "password",n" + " "columnType": "varchar",n" + " "columnValue": "password"n" + " },n" + " {n" + " "sourceColumn": "tele",n" + " "columnCode": "tele",n" + " "columnType": "varchar",n" + " "columnValue": "tele"n" + " }n" + " ]n" + " },n" + " {n" + " "partition": "",n" + " "sourceSchema": "crm1",n" + " "sourceTable": "user_id",n" + " "targetSchema": "test",n" + " "targetTable": "user",n" + " "operaType": "I,U,D",n" + " "where": "",n" + " "tableType": "main",n" + " "column": [n" + " {n" + " "sourceColumn": "user_id",n" + " "columnCode": "id",n" + " "columnType": "int",n" + " "columnValue": "user_id",n" + " "isPrimary": "Y"n" + " },n" + " {n" + " "sourceColumn": "name",n" + " "columnCode": "name",n" + " "columnType": "varchar",n" + " "columnValue": "name"n" + " },n" + " {n" + " "sourceColumn": "age",n" + " "columnCode": "age",n" + " "columnType": "int",n" + " "columnValue": "age"n" + " }n" + " ]n" + " },n" + " {n" + " "partition": "",n" + " "sourceSchema": "crm1",n" + " "sourceTable": "user_info",n" + " "targetSchema": "test",n" + " "targetTable": "user",n" + " "operaType": "I,U,D",n" + " "where": "",n" + " "tableType": "sub",n" + " "column": [n" + " {n" + " "sourceColumn": "user_info_id",n" + " "columnCode": "user_info_id",n" + " "columnType": "int",n" + " "isPrimary": "Y",n" + " "columnValue": "user_info_id"n" + " },n" + " {n" + " "sourceColumn": "sex",n" + " "columnCode": "sex",n" + " "columnType": "int",n" + " "columnValue": "sex"n" + " },n" + " {n" + " "sourceColumn": "user_name",n" + " "columnCode": "user_name",n" + " "columnType": "varchar",n" + " "columnValue": "user_name"n" + " },n" + " {n" + " "sourceColumn": "password",n" + " "columnCode": "password",n" + " "columnType": "varchar",n" + " "columnValue": "password"n" + " },n" + " {n" + " "sourceColumn": "tele",n" + " "columnCode": "tele",n" + " "columnType": "varchar",n" + " "columnValue": "tele"n" + " }n" + " ]n" + " }n" + " ]n" + "}n" + "n";

三、功能演示

(1)我们先查看目前三个表都为空:

(2)执行代码后

user表:可以看到user新增了一条数分别来自user_id表和user_info表,就是(二)上边的kafka消息

user_id表:

user_info表:

(2)修改一下user表的数据:再次执行,发现原来已经有id为2的数据(对应user_id主表的user_id主键字段),则自动替换为更新。并且user_id表和user_info表的数据不变,因为已经有了原来的数据,会自动根据主键去判断,即使是新增的类型,也会自动替换为修改

执行代码后:

总结:

以上就是kafka消息入库的功能,配合上一篇文章讲的,则可以实现Canal+kafka+flink/SparkStreaming实现一个完整的增量同步功能,并且可以定制同步合并到具体表。后边将分析代码部分......

最后

以上就是健忘铃铛最近收集整理的关于基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现的功能说明三、功能演示的全部内容,更多相关基于canal+kafka+flink实现实时增量同步4:kafka消息入库到MySQL功能演示一、新建三个表二、要实现内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(88)

评论列表共有 0 条评论

立即
投稿
返回
顶部