springboot 整合 hbase
要确定这三个端口外包可以访问
如果是127.0.0.1 可以参考修改
Linux下Hbase安装配置
复制代码
1
2
3
4
5
6
7
8<property> <name>hbase.master.ipc.address</name> <value>0.0.0.0</value> </property> <property> <name>hbase.regionserver.ipc.address</name> <value>0.0.0.0</value> </property>
配置linux服务器hosts
复制代码
1vim /etc/hosts
复制代码
1
2
3
4127.0.0.1 VM-16-8-centos VM-16-8-centos 127.0.0.1 localhost.localdomain localhot 127.0.0.1 localhost6.localdomain6 localhost6 ip vm-16-8-centos
配置window hosts
复制代码
1ip vm-16-8-centos
项目结构
pom.xml
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hbase</groupId> <artifactId>hbase</artifactId> <version>0.0.1-SNAPSHOT</version> <name>hbase</name> <description>springBoot_hBase Demo</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--HBase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.4.7</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
HbaseConfig
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package com.springboot.hbase.config.hbase; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.Map; /** * Hbase配置类 * @author sixmonth * @Date 2019年5月13日 * */ @Configuration @ConfigurationProperties(prefix = HbaseConfig.CONF_PREFIX) public class HbaseConfig { public static final String CONF_PREFIX = "hbase.conf"; private Map<String,String> confMaps; public Map<String, String> getconfMaps() { return confMaps; } public void setconfMaps(Map<String, String> confMaps) { this.confMaps = confMaps; } }
HBaseUtils
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326package com.springboot.hbase.config.hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import org.springframework.util.StopWatch; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * hbase工具类 * @author sixmonth * @Date 2019年5月13日 * */ @DependsOn("springContextHolder")//控制依赖顺序,保证springContextHolder类在之前已经加载 @Component public class HBaseUtils { private Logger logger = LoggerFactory.getLogger(this.getClass()); //手动获取hbaseConfig配置类对象 private static HbaseConfig hbaseConfig = SpringContextHolder.getBean("hbaseConfig"); private static Configuration conf = HBaseConfiguration.create(); private static ExecutorService pool = Executors.newScheduledThreadPool(20); //设置hbase连接池 private static Connection connection = null; private static HBaseUtils instance = null; private static Admin admin = null; private HBaseUtils(){ if(connection == null){ try { //将hbase配置类中定义的配置加载到连接池中每个连接里 Map<String, String> confMap = hbaseConfig.getconfMaps(); for (Map.Entry<String,String> confEntry : confMap.entrySet()) { conf.set(confEntry.getKey(), confEntry.getValue()); } connection = ConnectionFactory.createConnection(conf, pool); admin = connection.getAdmin(); } catch (IOException e) { logger.error("HbaseUtils实例初始化失败!错误信息为:" + e.getMessage(), e); } } } //简单单例方法,如果autowired自动注入就不需要此方法 public static synchronized HBaseUtils getInstance(){ if(instance == null){ instance = new HBaseUtils(); } return instance; } /** * 创建表 * * @param tableName 表名 * @param columnFamily 列族(数组) */ public void createTable(String tableName, String[] columnFamily) throws IOException{ TableName name = TableName.valueOf(tableName); //如果存在则删除 if (admin.tableExists(name)) { admin.disableTable(name); admin.deleteTable(name); logger.error("create htable error! this table {} already exists!", name); } else { HTableDescriptor desc = new HTableDescriptor(name); for (String cf : columnFamily) { desc.addFamily(new HColumnDescriptor(cf)); } admin.createTable(desc); } } /** * 插入记录(单行单列族-多列多值) * * @param tableName 表名 * @param row 行名 * @param columnFamilys 列族名 * @param columns 列名(数组) * @param values 值(数组)(且需要和列一一对应) */ public void insertRecords(String tableName, String row, String columnFamilys, String[] columns, String[] values) throws IOException { TableName name = TableName.valueOf(tableName); Table table = connection.getTable(name); Put put = new Put(Bytes.toBytes(row)); for (int i = 0; i < columns.length; i++) { put.addColumn(Bytes.toBytes(columnFamilys), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i])); table.put(put); } } /** * 插入记录(单行单列族-单列单值) * * @param tableName 表名 * @param row 行名 * @param columnFamily 列族名 * @param column 列名 * @param value 值 */ public void insertOneRecord(String tableName, String row, String columnFamily, String column, String value) throws IOException { TableName name = TableName.valueOf(tableName); Table table = connection.getTable(name); Put put = new Put(Bytes.toBytes(row)); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value)); table.put(put); } /** * 删除一行记录 * * @param tablename 表名 * @param rowkey 行名 */ public void deleteRow(String tablename, String rowkey) throws IOException { TableName name = TableName.valueOf(tablename); Table table = connection.getTable(name); Delete d = new Delete(rowkey.getBytes()); table.delete(d); } /** * 删除单行单列族记录 * @param tablename 表名 * @param rowkey 行名 * @param columnFamily 列族名 */ public void deleteColumnFamily(String tablename, String rowkey, String columnFamily) throws IOException { TableName name = TableName.valueOf(tablename); Table table = connection.getTable(name); Delete d = new Delete(rowkey.getBytes()).addFamily(Bytes.toBytes(columnFamily)); table.delete(d); } /** * 删除单行单列族单列记录 * * @param tablename 表名 * @param rowkey 行名 * @param columnFamily 列族名 * @param column 列名 */ public void deleteColumn(String tablename, String rowkey, String columnFamily, String column) throws IOException { TableName name = TableName.valueOf(tablename); Table table = connection.getTable(name); Delete d = new Delete(rowkey.getBytes()).addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); table.delete(d); } /** * 查找一行记录 * * @param tablename 表名 * @param rowKey 行名 */ public static String selectRow(String tablename, String rowKey) throws IOException { String record = ""; TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); Get g = new Get(rowKey.getBytes()); Result rs = table.get(g); NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map = rs.getMap(); for (Cell cell : rs.rawCells()) { StringBuffer stringBuffer = new StringBuffer() .append(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())).append("t") .append(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())).append("t") .append(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())).append("t") .append(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())).append("n"); String str = stringBuffer.toString(); record += str; } return record; } /** * 查找单行单列族单列记录 * * @param tablename 表名 * @param rowKey 行名 * @param columnFamily 列族名 * @param column 列名 * @return */ public static String selectValue(String tablename, String rowKey, String columnFamily, String column) throws IOException { TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); Get g = new Get(rowKey.getBytes()); g.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column)); Result rs = table.get(g); return Bytes.toString(rs.value()); } /** * 查询表中所有行(Scan方式) * * @param tablename * @return */ public String scanAllRecord(String tablename) throws IOException { String record = ""; TableName name=TableName.valueOf(tablename); Table table = connection.getTable(name); Scan scan = new Scan(); ResultScanner scanner = table.getScanner(scan); try { for(Result result : scanner){ for (Cell cell : result.rawCells()) { StringBuffer stringBuffer = new StringBuffer() .append(Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())).append("t") .append(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())).append("t") .append(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())).append("t") .append(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())).append("n"); String str = stringBuffer.toString(); record += str; } } } finally { if (scanner != null) { scanner.close(); } } return record; } /** * 根据rowkey关键字查询报告记录 * * @param tablename * @param rowKeyword * @return */ public List scanReportDataByRowKeyword(String tablename, String rowKeyword) throws IOException { ArrayList<Object> list = new ArrayList<Object>(); Table table = connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); //添加行键过滤器,根据关键字匹配 RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword)); scan.setFilter(rowFilter); ResultScanner scanner = table.getScanner(scan); try { for (Result result : scanner) { //TODO 此处根据业务来自定义实现 list.add(null); } } finally { if (scanner != null) { scanner.close(); } } return list; } /** * 根据rowkey关键字和时间戳范围查询报告记录 * * @param tablename * @param rowKeyword * @return */ public List scanReportDataByRowKeywordTimestamp(String tablename, String rowKeyword, Long minStamp, Long maxStamp) throws IOException { ArrayList<Object> list = new ArrayList<Object>(); Table table = connection.getTable(TableName.valueOf(tablename)); Scan scan = new Scan(); //添加scan的时间范围 scan.setTimeRange(minStamp, maxStamp); RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKeyword)); scan.setFilter(rowFilter); ResultScanner scanner = table.getScanner(scan); try { for (Result result : scanner) { //TODO 此处根据业务来自定义实现 list.add(null); } } finally { if (scanner != null) { scanner.close(); } } return list; } /** * 删除表操作 * * @param tablename */ public void deleteTable(String tablename) throws IOException { TableName name=TableName.valueOf(tablename); if(admin.tableExists(name)) { admin.disableTable(name); admin.deleteTable(name); } } }
SpringContextHolder
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43package com.springboot.hbase.config.hbase; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * Spring的ApplicationContext的持有者,可以用静态方法的方式获取spring容器中的bean */ @Component public class SpringContextHolder implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { SpringContextHolder.applicationContext = applicationContext; } public static ApplicationContext getApplicationContext() { assertApplicationContext(); return applicationContext; } @SuppressWarnings("unchecked") public static <T> T getBean(String beanName) { assertApplicationContext(); return (T) applicationContext.getBean(beanName); } public static <T> T getBean(Class<T> requiredType) { assertApplicationContext(); return applicationContext.getBean(requiredType); } private static void assertApplicationContext() { if (SpringContextHolder.applicationContext == null) { throw new RuntimeException("applicaitonContext属性为null,请检查是否注入了SpringContextHolder!"); } } }
application.properties
复制代码
1hbase.conf.confMaps.hbase.zookeeper.quorum= vm-16-8-centos:2181
HBaseApplicationTests
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34package com.springboot.hbase; import com.springboot.hbase.config.hbase.HBaseUtils; import lombok.extern.log4j.Log4j2; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import java.io.IOException; import java.util.HashMap; import java.util.Map; @SpringBootTest @Log4j2 class HBaseApplicationTests { @Autowired private HBaseUtils hBaseUtils; @Test void deleteTable() throws IOException { Map<String,Object> map = new HashMap<String,Object>(); try { String str = hBaseUtils.scanAllRecord("SYSTEM.TASK");//扫描表 System.out.println("获取到hbase的内容:"+str); map.put("hbaseContent",str); } catch (IOException e) { e.printStackTrace(); } System.out.println(map); } }
执行测试类
成功如下
到此这篇关于springboot 整合 hbase的文章就介绍到这了,更多相关springboot 整合 hbase内容请搜索靠谱客以前的文章或继续浏览下面的相关文章希望大家以后多多支持靠谱客!
最后
以上就是不安小伙最近收集整理的关于聊聊springboot 整合 hbase的问题的全部内容,更多相关聊聊springboot 整合 hbase内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复