我是靠谱客的博主 娇气早晨,这篇文章主要介绍Elasticsearch基础—— API介绍(JAVA)Elasticsearch JAVA API,现在分享给大家,希望可以做个参考。

Elasticsearch JAVA API

下面内容是,官方文档的整理,想了解详细的内容可以访问官方文档:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

客户端连接

Elasticsearch Java API 存在多种Client连接方式

  • TransportClient和RestClient是Elasticsearch原生的api
  • Spring Data Elasticsearch 为Spring整合的连接方式
  • Jest是Java社区开发的,是Elasticsearch的Java Http Rest客户端

目前几种连接方式:
TransportClient 后续版本中会被弃用

Jest因为是社区维护存在一定的延迟

Spring Data Elasticsearch为Spring整合的内容主要是为了配合Spring生态

Rest Client,官方维护

TransportClient 连接及操作

因为后续会被移除,所以只做简单了解,并没有介绍其他操作

依赖

复制代码
1
2
3
4
5
6
7
8
9
10
11
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.1.1</version> </dependency>

创建连接的代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public TransportClient getClient(){ TransportClient client = null; try { Settings settings = Settings.builder() .put("client.transport.sniff", true) .put("cluster.name", "name").build(); client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(new InetSocketAddress("192.168.1.1", 9300))); } catch (Exception e) { e.printStackTrace(); } return client; }

RestClient 连接及操作

因为现在多是使用此方式进行ES的连接,会主要介绍其相关数据操作的内容

客户端连接

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.1.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.1.1</version> </dependency>

创建连接的代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RestHighLevelClient getClient() { RestHighLevelClient client = null; try { client = new RestHighLevelClient( RestClient.builder( new HttpHost("192.168.1.1", 9300, "http") ) ); } catch (Exception e) { e.printStackTrace(); } return client; }

Document APIs

Document APIs主要涉及些增删改查等操作,包括单条操作和多条操作

插入记录

Elasticsearch的数据插入主要是使用IndexRequest对象。目前Elasticsearch提供四种方式创建IndexRequest对象。

使用JSON形式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/** * 使用JSON形式 * @return * @throws IOException */ public IndexRequest IndexString() throws IOException { // 添加文档的请求 // 三个参数 索引 文档id IndexRequest request = new IndexRequest("index", "id"); String jsonString = "{" + ""name":"key"," + ""age":"10"," + ""des":"this data"" + "}"; // 以字符串形式提供的数据 request.source(jsonString, XContentType.JSON); return request; }

使用map形式

复制代码
1
2
3
4
5
6
7
8
9
10
public IndexRequest IndexMap() { //第二种方式: Map Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("name", "key"); jsonMap.put("date", new Date()); jsonMap.put("des", "this data"); IndexRequest indexRequest = new IndexRequest("index", "id").source(jsonMap); return indexRequest; }

使用Builder建造器

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** * 使用建造器 * @return * @throws IOException */ public IndexRequest indexBuilder() throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); builder.field("name", "key"); builder.timeField("date" , new Date()); builder.field("des", "this data"); builder.endObject(); IndexRequest indexRequest1 = new IndexRequest("index", "id") .source(builder); return indexRequest1; }

使用key-value形式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** * 使用key-value * @return * @throws IOException */ public IndexRequest indexSource() throws IOException { IndexRequest indexRequest2 = new IndexRequest("index", "id") .source("name", "key", "date", new Date(), "des", "this data" ); return indexRequest2; }

在完成IndexRequest创建之后需要进行文档插入的操作,目前ES提供了同步和异步的方式插入数据

同步的插入数据

复制代码
1
2
3
//同步执行 IndexResponse indexResponse = RestClientUtils.client.index(request, RequestOptions.DEFAULT);

异步的插入数据

异步插入数据需要配置一个实现ActionListener接口的监听器,以方便监听结果的返回。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//异步执行 ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } }; RestClientUtils.client.indexAsync(request, RequestOptions.DEFAULT, listener);

除了上面最基本的参数之外,ES也提供了更多的参数来实现不同的业务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
IndexRequest request = new IndexRequest(); // 路由键 request.routing("routing"); // 设置超时 request.timeout(TimeValue.timeValueSeconds(1)); // 字符串形式的超时 request.timeout("1s"); // 设置刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 字符串形式的刷新策略 request.setRefreshPolicy("wait_for"); // 版本设置 request.version(2); // 文档类型 request.versionType(VersionType.EXTERNAL); // 操作类型 request.opType(DocWriteRequest.OpType.CREATE); // 操作类型 request.opType("create"); // 设置文档获取通道名称 request.setPipeline("pipeline");

获取数据(文档)

这里主要介绍的是单一数据的获取

Elasticsearch的数据获取主要是使用GetRequest对象。和插入数据类似,获取数据ES同样提供了同步和异步的方法;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/** * 根据 id 获取数据 * @throws Exception */ public void get() throws Exception{ GetRequest request = new GetRequest("index", "1"); //同步执行 GetResponse getResponse = RestClientUtils.client.get(request, RequestOptions.DEFAULT); //异步执行 ActionListener<GetResponse> listener = new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse documentFields) { } @Override public void onFailure(Exception e) { } }; //异步执行 listener的写法参照Index的异步执行的listener RestClientUtils.client.getAsync(request, RequestOptions.DEFAULT, listener); //Get Response 获取信息 //抛出异常 }

GetRequest同样提供了更多的参数,来丰富其业务

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 禁用源检索,默认情况下启用 request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); // 为特定字段配置源包含 String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext); // 为特定字段配置源排除 String[] includes2 = Strings.EMPTY_ARRAY; String[] excludes2 = new String[]{"message"}; FetchSourceContext fetchSourceContext2 = new FetchSourceContext(true, includes2, excludes2); request.fetchSourceContext(fetchSourceContext2); // 配置路由 request.routing("routing"); // 配置偏好值 request.preference("preference"); // 将realtime设置为false request.realtime(false); // 在检索文档之前执行刷新(默认为false) request.refresh(true); // 版本 request.version(2); // 版本类型 request.versionType(VersionType.EXTERNAL);

数据(文档)验证

验证数据的存在和获取数据都是使用GetRequest对象进行操作,不过因为数据验证只会返回true或者false

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void exists() throws Exception{ GetRequest getRequest = new GetRequest("index", "1"); //同步执行 boolean exists = RestClientUtils.client.exists(getRequest, RequestOptions.DEFAULT); //异步执行 ActionListener<Boolean> listener = new ActionListener<Boolean>() { @Override public void onResponse(Boolean aBoolean) { } @Override public void onFailure(Exception e) { } }; //异步执行 listener的写法参照Index的 RestClientUtils.client.existsAsync(getRequest, RequestOptions.DEFAULT, listener); if (exists){ System.out.println("存在"); }else { System.out.println("不存在"); } }

有一点需要注意因为数据验证只会返回true和false所以,关闭对源数据的提取会消耗更少的资源

复制代码
1
2
3
// 禁用fetching _source. getRequest.fetchSourceContext(new FetchSourceContext(false));

删除数据(文档)

Elasticsearch的删除数据主要是使用DeleteResponse对象。和插入数据类似,获取数据ES同样提供了同步和异步的方法;

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/** * 根据id删除 * @throws Exception */ public void delete() throws Exception{ DeleteRequest request = new DeleteRequest("posts", "1"); //同步执行 DeleteResponse deleteResponse = RestClientUtils.client.delete(request, RequestOptions.DEFAULT); //异步执行 ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() { @Override public void onResponse(DeleteResponse deleteResponse1) { } @Override public void onFailure(Exception e) { } }; //异步执行 listener参照index的 RestClientUtils.client.deleteAsync(request, RequestOptions.DEFAULT, listener); //Delete Response String index = deleteResponse.getIndex(); // 文档未找到 if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { } //抛出异常 }

额外的参数配置

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 路由值 request.routing("routing"); // 设置超时 request.timeout(TimeValue.timeValueMinutes(2)); // 以字符串形式设置超时 request.timeout("2m"); // 设置刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 字符串设置刷新策略 request.setRefreshPolicy("wait_for"); // 版本 request.version(2); // 版本类型 request.versionType(VersionType.EXTERNAL);

更新数据(文档)

Elasticsearch的更新数据主要是使用UpdateRequest对象。和插入数据类似,ES提供了多种方式创建UpdateRequest对象

用脚本更新

复制代码
1
2
3
4
5
6
7
8
9
10
public UpdateRequest getScript1() { UpdateRequest request = new UpdateRequest("index", "1"); Map<String, Object> parameters = new HashMap<>(1); // 第一种方式:inline script Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters); request.script(inline); return request; }

存储的脚本

复制代码
1
2
3
4
5
6
7
8
public UpdateRequest getScript2() { UpdateRequest request = new UpdateRequest("index", "1"); Map<String, Object> parameters = new HashMap<>(1); Script stored = new Script(ScriptType.STORED, null, "increment-field", parameters); request.script(stored); return request; }

用部分文档更新

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// JSON形式 public UpdateRequest getScript3() { UpdateRequest request = new UpdateRequest("index", "1"); Map<String, Object> parameters = new HashMap<>(1); String jsonString = "{" + ""updated":"2019-10-09"," + ""reason":"data update "" + "}"; // 以JSON格式的字符串形式提供的部分文档 request.doc(jsonString, XContentType.JSON); return request; } // map public UpdateRequest getScript4() { UpdateRequest request = new UpdateRequest("index", "1"); Map<String, Object> parameters = new HashMap<>(1); Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("updated", new Date()); jsonMap.put("reason", "daily update"); request.doc(jsonMap); return request; } // builder public UpdateRequest getScript5() throws IOException { UpdateRequest request = new UpdateRequest("index", "1"); Map<String, Object> parameters = new HashMap<>(1); XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); builder.timeField("updated", new Date()); builder.field("reason", "daily update"); builder.endObject(); request.doc(builder); return request; } // key-value public UpdateRequest getScript6() { UpdateRequest request = new UpdateRequest("index",, "1"); Map<String, Object> parameters = new HashMap<>(1); request.doc("updated", new Date(), "reason", "daily update"); //upserts String jsonString1 = "{"created":"2019-10-09"}"; request.upsert(jsonString1, XContentType.JSON); return request; }

执行更新的操作

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void update() throws IOException { UpdateRequest request = getScript1(); // 同步执行 UpdateResponse updateResponse = RestClientUtils.client.update(request, RequestOptions.DEFAULT); // 异步执行 ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() { @Override public void onResponse(UpdateResponse updateResponse1) { } @Override public void onFailure(Exception e) { } }; // 异步执行, listener创建参考index的 RestClientUtils.client.updateAsync(request, RequestOptions.DEFAULT, listener); // update Response GetResult result = updateResponse.getGetResult(); if (result.isExists()) { String sourceAsString = result.sourceAsString(); Map<String, Object> sourceAsMap = result.sourceAsMap(); byte[] sourceAsBytes = result.source(); } else { } //抛出异常 }

至于更新的结果我们可以根据返回的UpdateResponse的结果来获取

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
// 我们使用getResult方法的返回内容可以判断更新结果 DocWriteResponse.Result result = updateResponse.getResult(); // 第一次创建文档的情况(upsert) DocWriteResponse.Result.CREATED // 文档更新 DocWriteResponse.Result.UPDATED // 文档被删除的情况 DocWriteResponse.Result.DELETED // 文档不受更新影响的情况,即没有对文档执行任何操作(noop) DocWriteResponse.Result.NOOP

额外的参数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 路由值 request.routing("routing"); // 超时 request.timeout(TimeValue.timeValueSeconds(1)); // 字符串设置的超时 request.timeout("1s"); // 设置刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); // 字符串形式刷新策略 request.setRefreshPolicy("wait_for"); // 更新文档在更新操作的获取和索引之间被其他操作修改,重试的次数 request.retryOnConflict(3); // 启用源检索,默认情况下禁用 request.fetchSource(true); // 为特定字段配置源包含 String[] includes = new String[]{"updated", "r*"}; String[] excludes = Strings.EMPTY_ARRAY; request.fetchSource( new FetchSourceContext(true, includes, excludes)); // 为特定字段配置源排除 String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"updated"}; request.fetchSource( new FetchSourceContext(true, includes, excludes)); // 禁用noop检测 request.detectNoop(false); // 指出无论文档是否存在,脚本都必须运行,即如果文档不存在,脚本负责创建文档。 request.scriptedUpsert(true); // 指示如果部分文档尚不存在,则必须将其用作upsert文档。 request.docAsUpsert(true); // 设置在继续更新操作之前必须活动的碎片副本数量。 request.waitForActiveShards(2); // ActiveShardCount的碎片副本数。可选值:ActiveShardCount.ALL, ActiveShardCount.ONE或者 ActiveShardCount.DEFAULT request.waitForActiveShards(ActiveShardCount.ALL);

批处理

ES提供了一个功能,我们可以使用单个请求执行多个索引的更新或删除,这就需要使用到BulkRequest

复制代码
1
2
3
4
5
6
7
8
9
10
BulkRequest request = new BulkRequest(); //Other request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON, "other", "test")); request.add(new IndexRequest("posts") .source(XContentType.JSON, "field", "baz"));

我们可以使用这种方式将多个索引操作放到一个请求中。并且这些操作并不需要为同一类操作,可以是删除和更新放在一起。

批处理的执行

和上面的内容一样,其支持同步和异步两种方式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//同步执行 BulkResponse bulkResponses = RestClientUtils.client.bulk(request, RequestOptions.DEFAULT); //异步执行 ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; RestClientUtils.client.bulkAsync(request, RequestOptions.DEFAULT, listener);

额外的参数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 设置超时 request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m"); // 设置刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for"); // 设置在操作前需要活动的碎片副本数量 request.waitForActiveShards(2); // 作为动态硬装载提供的碎片副本数,可选:ActiveShardCount.ALL, ActiveShardCount.ONE或 ActiveShardCount.DEFAULT request.waitForActiveShards(ActiveShardCount.ALL); // 用户子请求的全局管道 request.pipeline("pipelineId"); //用于所有子请求的全局管道标识 // 路由 request.routing("routingId"); // 设置请求的全局索引 BulkRequest defaulted = new BulkRequest("posts");

结果的处理

执行批处理后返回的BulkResponse包含有关已执行操作的信息,我们可以迭代每个结果来获取每条数据执行的内容

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//Bulk Response 批处理结果 for (BulkItemResponse bulkItemResponse: bulkResponses){ DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 索引操作或创建操作 if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; // 更新操作 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; // 删除操作 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
批量数据获取

除了BulkResponse,ES还提供了MultiGetRequest对象,以实现一个请求,并行的处理多个查询操作

复制代码
1
2
3
4
5
6
MultiGetRequest request = new MultiGetRequest(); request.add(new MultiGetRequest.Item("index","example_id")); request.add(new MultiGetRequest.Item("index", "another_id")); //optional arguments request.add(new MultiGetRequest.Item("index", "example_id") .fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE));

我们可以为一个请求添加多个操作子项,并且为其配置单独的参数。

执行获取

同样ES提供了同步和异步的获取方式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//同步执行 MultiGetResponse responses = RestClientUtils.client.mget(request, RequestOptions.DEFAULT); //异步执行 ActionListener<MultiGetResponse> listener = new ActionListener<MultiGetResponse>() { @Override public void onResponse(MultiGetResponse multiGetItemResponses) { } @Override public void onFailure(Exception e) { } }; //异步执行 listener参考Index的 RestClientUtils.client.mgetAsync(request, RequestOptions.DEFAULT, listener);

结果的处理

返回的内容是一个MultiGetItemResponse列表,按请求的顺序排列在GetResponse中。其操作很类似一个普通的GetResponse。

复制代码
1
2
3
4
5
6
7
8
//Multi Get Response MultiGetItemResponse firstItem = responses.getResponses()[0]; GetResponse firstGet = firstItem.getResponse(); if (firstGet.isExists()) { // 业务 }

额外的参数设置

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 禁用源检索,默认情况下启用 request.add(new MultiGetRequest.Item("index", "example_id") .fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)); // 为特定字段配置源包含 String[] includes = new String[] {"foo", "*r"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.add(new MultiGetRequest.Item("index", "example_id") .fetchSourceContext(fetchSourceContext)); // 为特定字段配置源排除 String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[] {"foo", "*r"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.add(new MultiGetRequest.Item("index", "example_id") .fetchSourceContext(fetchSourceContext)); // 配置特定存储字段的检索(要求字段在映射中单独存储) request.add(new MultiGetRequest.Item("index", "example_id") .storedFields("foo")); // 设置路由 request.add(new MultiGetRequest.Item("index", "with_routing") .routing("some_routing")); // 设置版本 以及版本类似 request.add(new MultiGetRequest.Item("index", "with_version") .versionType(VersionType.EXTERNAL) / .version(10123L)); // preference, realtime和refresh可以在主请求上设置,但不能在任何项目上设置: // 偏好值 request.preference("some_preference"); // 将实时标志设置为false(默认true) request.realtime(false); // 在检索文档之前执行刷新(默认false) request.refresh(true);

个人水平有限,上面的内容可能存在没有描述清楚或者错误的地方,假如开发同学发现了,请及时告知,我会第一时间修改相关内容。假如我的这篇内容对你有任何帮助的话,麻烦给我点一个赞。你的点赞就是我前进的动力。

最后

以上就是娇气早晨最近收集整理的关于Elasticsearch基础—— API介绍(JAVA)Elasticsearch JAVA API的全部内容,更多相关Elasticsearch基础——内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部