首先看一下一个GET请求是什么样的(参考官网):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18GET twitter/_doc/0 { "_index" : "twitter", "_type" : "_doc", "_id" : "0", "_version" : 1, "_seq_no" : 10, "_primary_term" : 1, "found": true, "_source" : { "user" : "kimchy", "date" : "2009-11-15T14:12:12", "likes": 0, "message" : "trying out Elasticsearch" } }
分析从这一个请求开始。
- 首先Elasticsearch接收到这样一个请求,通过org.elasticsearch.rest.RestController#dispatchRequest()去寻找符合这个条件的Handler,RestGetAction这个Handler是符合这个请求的。在这个Handler定义了符合条件的GET请求的格式。
1
2
3
4
5
6
7
8
9
10public RestGetAction(final Settings settings, final RestController controller) { super(settings); controller.registerHandler(GET, "/{index}/_doc/{id}", this); controller.registerHandler(HEAD, "/{index}/_doc/{id}", this); // Deprecated typed endpoints. controller.registerHandler(GET, "/{index}/{type}/{id}", this); controller.registerHandler(HEAD, "/{index}/{type}/{id}", this); }
1
2
3
4
5
6
7
8public class ActionModule extends AbstractModule { public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) { ······ registerHandler.accept(new RestGetAction(settings, restController)); ······ } }
这个Handler是在节点初始化的时候注册上去的。
- 找到这个Handler以后首先执行org.elasticsearch.rest.BaseRestHandler#handleRequest方法,首先会调用org.elasticsearch.rest.action.document.RestGetAction#prepareRequest这方法先处理请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { GetRequest getRequest; // 如果有type,则提示 //构造一个get请求 if (request.hasParam("type")) { deprecationLogger.deprecatedAndMaybeLog("get_with_types", TYPES_DEPRECATION_MESSAGE); getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id")); } else { getRequest = new GetRequest(request.param("index"), request.param("id")); } // 判断是否refresh routing等 getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); getRequest.routing(request.param("routing")); getRequest.preference(request.param("preference")); getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime())); // 不支持指定fields;需要使用stored_fields if (request.param("fields") != null) { throw new IllegalArgumentException("the parameter [fields] is no longer supported, " + "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source"); } // 获取store_field final String fieldsParam = request.param("stored_fields"); if (fieldsParam != null) { final String[] fields = Strings.splitStringByCommaToArray(fieldsParam); if (fields != null) { getRequest.storedFields(fields); } } getRequest.version(RestActions.parseVersion(request)); getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType())); //是否排除一些字段或者包含一些字段 getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request)); return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) { @Override protected RestStatus getStatus(final GetResponse response) { return response.isExists() ? OK : NOT_FOUND; } }); }
这里返回了一个RestChannelConsumer对象,这个是一个函数式接口,告诉需要具体执行的方法是 client.get()。在之前的handleRequest方法中获取到这个接口对象,然后进行一些参数和请求体的判断逻辑,然后就要执行这个方法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception { // prepare the request for execution; has the side effect of touching the request parameters // 获取到了这个接口函数 final RestChannelConsumer action = prepareRequest(request, client); // validate unconsumed params, but we must exclude params used to format the response // use a sorted set so the unconsumed parameters appear in a reliable sorted order final SortedSet<String> unconsumedParams = request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new)); // validate the non-response params if (!unconsumedParams.isEmpty()) { final Set<String> candidateParams = new HashSet<>(); candidateParams.addAll(request.consumedParams()); candidateParams.addAll(responseParams()); throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter")); } if (request.hasContent() && request.isContentConsumed() == false) { throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body"); } usageCount.increment(); // execute the action // 去执行里面定义好的方法 action.accept(channel); }
- 区体执行的方法是org.elasticsearch.client.node.NodeClient#doExecute -> executeLocally(),再看下这个方法:
1
2
3
4
5
6public < Request extends ActionRequest, Response extends ActionResponse > Task executeLocally(Action<Response> action, Request request, ActionListener<Response> listener) { return transportAction(action).execute(request, listener); }
首先获取这个请求对应的transportAction,GET对应的Action是GetAction.INSTANCE -> TransportGetAction, 这个也是在节点启动的时候注册好的:
1
2actions.register(GetAction.INSTANCE, TransportGetAction.class);
获取到这个Action在去执行它的execute方法,TransportGetAction继承了TransportSingleShardAction,他的execute方法在父类里面org.elasticsearch.action.support.TransportAction#execute,看下具体执行的内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public final void execute(Task task, Request request, ActionListener<Response> listener) { ActionRequestValidationException validationException = request.validate(); if (validationException != null) { listener.onFailure(validationException); return; } if (task != null && request.getShouldStoreResult()) { listener = new TaskResultStoringActionListener<>(taskManager, task, listener); } //定义了一个批处理类,会将有关的一些插件内容在具体执行方法前执行 RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger); requestFilterChain.proceed(task, actionName, request, listener); } public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) { int i = index.getAndIncrement(); try { if (i < this.action.filters.length) { // 执行插件逻辑 this.action.filters[i].apply(task, actionName, request, listener, this); } else if (i == this.action.filters.length) { // 执行action的逻辑 this.action.doExecute(task, request, listener); } else { listener.onFailure(new IllegalStateException("proceed was called too many times")); } } catch(Exception e) { logger.trace("Error during transport action execution.", e); listener.onFailure(e); } }
然后就是需要执行Action的doExecute方法了,对与TransportGetAction,它的doExecute在TransportSingleShardAction里面,
1
2
3
4
5// 具体就是创建了一个AsyncSingleAction对象然后执行它的start方法 protected void doExecute(Task task, Request request, ActionListener<Response> listener) { new AsyncSingleAction(request, listener).start(); }
在这里new了一个对象,我们先看下它的构造方法:
1
2
3
4
5private AsyncSingleAction(Request request, ActionListener<Response> listener) { ······ this.shardIt = shards(clusterState, internalRequest); }
这里比较重要的就是获取的请求的分片信息,这个方法是一个抽象方法,需要继承这个接口的类去实现:
1
2
3
4
5
6
7
8/** * Returns the candidate shards to execute the operation on or <code>null</code> the execute * the operation locally (the node that received the request) */ // 返回要在其上执行操作的候选碎片或<code>null</code>在本地执行操作(接收请求的节点) @Nullable protected abstract ShardsIterator shards(ClusterState state, InternalRequest request);
官方注释的意思是如果没有找到分片则本地执行返回null,其他节点执行返回具体需要执行的分片信息。这是一个迭代器,方便失败时在下一个分片执行。构建完对象以后就可以去执行start方法了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public void start() { if (shardIt == null) { // just execute it on the local node final Writeable.Reader<Response> reader = getResponseReader(); // 没有找到shard将请求发向本地 transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() { @Override public Response read(StreamInput in) throws IOException { return reader.read(in); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(final Response response) { listener.onResponse(response); } @Override public void handleException(TransportException exp) { listener.onFailure(exp); } }); } else { // 向其他节点发送请求 perform(null); } }
再看下perform()方法,失败了会调用onFailure方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33private void perform(@Nullable final Exception currentFailure) { ······ // 获取routing 失败了会使用下一个分片去请求结果 final ShardRouting shardRouting = shardIt.nextOrNull(); ····· // 获取routing所在的节点 DiscoveryNode node = nodes.get(shardRouting.currentNodeId()); if (node == null) { onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId())); } else { ······ } final Writeable.Reader<Response> reader = getResponseReader(); transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() { ······ @Override public void handleException(TransportException exp) { onFailure(shardRouting, exp); } }); } } // 失败会一直调用上面这个方法,直到成功或者所有分片都失败 private void onFailure(ShardRouting shardRouting, Exception e) { if (e != null) { logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e); } perform(e); }
- 下一步需要执行的就是 transportService.sendRequest(node, transportShardAction ·····),对于transportShardAction这个string,是在TransportGetAction这个类new的时候构造的,看下具体的构造方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21protected TransportSingleShardAction(String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, String executor) { super(actionName, actionFilters, transportService.getTaskManager()); this.threadPool = threadPool; this.clusterService = clusterService; this.transportService = transportService; this.indexNameExpressionResolver = indexNameExpressionResolver; // 在这里定义了这个string, this.transportShardAction = actionName + "[s]"; this.executor = executor; // 注册一个使其它client调用的Handler if (!isSubAction()) { transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler()); } // 注册transportShardAction 对应的Handler,也就是上面我们需要执行的Handler transportService.registerRequestHandler(transportShardAction, request, ThreadPool.Names.SAME, new ShardTransportHandler()); }
transportService.sendRequest这个方法不管是像本地发送请求,还是像其他节点发送请求,最终都会调用根据这个actionName(transportShardAction)注册的Handler下的messageReceived方法:
1
2
3
4
5
6
7
8
9
10
11
12// action[s]执行的方法 private class ShardTransportHandler implements TransportRequestHandler<Request> { @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { if (logger.isTraceEnabled()) { logger.trace("executing [{}] on shard [{}]", request, request.internalShardId); } asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request)); } }
最终执行的方法就是继承TransportSingleShardAction类中实现的shardOperation方法:
1
2
3// 这也是一个抽象类,需要具体执行的Action去实现 protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
所以我们到TransportGetAction类中去找GET请求最终要实现的代码(org.elasticsearch.action.get.TransportGetAction#shardOperation):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected GetResponse shardOperation(GetRequest request, ShardId shardId) { IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); IndexShard indexShard = indexService.getShard(shardId.id()); // 如果 if (request.refresh() && !request.realtime()) { indexShard.refresh("refresh_flag_get"); } // 带上分文档信息去获取结果 GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(), request.realtime(), request.version(), request.versionType(), request.fetchSourceContext()); return new GetResponse(result); }
- 去获取文档信息的方法最后会执行到org.elasticsearch.index.engine.InternalEngine#get,在这个方法里面执行最终的GET操作:
1
2
3
4
5
6
7
8
9
10
11
12// 读取文档的具体流程 public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException { ······· refresh("realtime_get", SearcherScope.INTERNAL, true); ······ // no version, get the version from the index, we know that we refresh on flush // 调用searcher读取数据 // 使用Lucene接口获取文档信息 return getFromSearcher(get, searcherFactory, scope); } }
在老版本的ES中GET请求是去translog中获取最新的文案信息来保证是最新的数据,在现在的版本中依靠refresh将内存中的数据落到segment中,使得在下面的操作中可以从Lucene中获取到数据。
到这里一个GET请求就拿到数据了,MutiGet请求的执行流程和这个基本类似,跟bulk请求一样将在同一个分片上的请求集合起来方便去执行,具体请求的流程和上面差不多。
思考
- ES中的实时性是指在GET 或则 MGET中获取到的数据是最新的数据,不包括搜索聚合。
- 在5.x之后的版本改为refresh实现实时性,导致对系统的写入速度有影响。
- Update操作是先GET在写,为了保证一致性强制设置了realtime为true,所以update操作可能会refresh以至于生产新的Segment。
- 当在一个分片上读失败以后,会尝试从其他副本读取。
参考
- 《Elasticsearch源码解析与优化实战》张超
- https://www.elastic.co/guide/en/elasticsearch/reference/7.2/docs-get.html
最后
以上就是娇气荷花最近收集整理的关于Elasticsearch GET请求流程源码分析的全部内容,更多相关Elasticsearch内容请搜索靠谱客的其他文章。
发表评论 取消回复