我是靠谱客的博主 奋斗小笼包,这篇文章主要介绍springcloud源码之eureka-client服务发现springcloud源码之eureka-client服务发现,现在分享给大家,希望可以做个参考。
文章目录
- springcloud源码之eureka-client服务发现
- 前言
- 服务发现
- 总结
springcloud源码之eureka-client服务发现
前言
请先看服务注册,这篇博文把代码入口说清楚了,服务发现和心跳的代码在一个地方
服务发现
private int registryFetchIntervalSeconds = 30;
下面这个线程就是服务发现的,也是由一个定时器每隔30s去eureka-server拿一次数据
复制代码
1
2
3
4
5
6
7
8class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } }
refreshRegistry关键代码如下
复制代码
1
2
3
4
5
6
7//fetchRegistry其实在eureka-client初始化的时候拿过一次,上篇说了 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); }
fetchRegistry关键代码如下
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21//forceFullRegistryFetch=true 代表全量拉取 private boolean fetchRegistry(boolean forceFullRegistryFetch) { //从缓存中拿到已存的微服务信息 Applications applications = getApplications(); //如果配置了不增量拉取||配置了vip地址||applications ==null||没有一个微服务信息||之前没有拉取过 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) { //全量拉取 getAndStoreFullRegistry(); } else { //增量拉取 getAndUpdateDelta(applications); } return true; }
全量拉取
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void getAndStoreFullRegistry() throws Throwable { Applications apps = null; //访问eureka-server的接口ApplicationsResource#getContainers拉取全部的微服务实例 //eureka-sever缓存设计这个博客说了getContainers EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { apps = httpResponse.getEntity(); } //更新缓存 localRegionApps.set(this.filterAndShuffle(apps)); }
增量拉取
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private void getAndUpdateDelta(Applications applications) throws Throwable { //存储增量微服务 Applications delta = null; //去访问ApplicationsResource#getContainerDifferential接口增量拉取微服务 //这里具体服务端是如何控制哪些微服务是要被增量拉取的待会再说 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { delta = httpResponse.getEntity(); } //没有拿到增量就去再全量拉取一遍 if (delta == null) { getAndStoreFullRegistry(); } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //加锁,因为上面有网络请求耗时操作,可能这块代码会有并发 if (fetchRegistryUpdateLock.tryLock()) { try { //去更新本地缓存,根据微服务的上一次操作类型就更新 //MODIFIED和ADDED类型就执行add操作 //DELETED类型就执行remove操作 updateDelta(delta); } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } }
服务端是如何判断那些微服务属于增量的
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29//getContainerDifferential会构造一个ALL_APPS_DELTA类型的key Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); responseCache.getGZIP(cacheKey) -----------------> 下面这些代码在缓存设计博客说到了 Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; if (useReadOnlyCache) { //先从只读缓存拿 final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { //拿到了直接返回 payload = currentPayload; } else { //去读写缓存拿,拿到了回写只读缓存 //readWriteCacheMap是基于guava的cache来的,如果readWriteCacheMap拿不到 //会回调ResponseCacheImpl的构造方法里的generatePayload(key);这行代码 payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } return payload; }
generatePayload(key)
复制代码
1
2
3
4//获取增量的逻辑 payload = getPayLoad(key, registry.getApplicationDeltas());
registry.getApplicationDeltas()
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13public Applications getApplicationDeltas() { Applications apps = new Applications(); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); //recentlyChangedQueue这个队列里面就是存储增量微服务的,这个队列有个神奇的地方,只会存储最近操作三分钟以内的微服务 Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); //遍历增量队列返回增量 while (iter.hasNext()) { } return apps; }
增量队列三分钟的代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { //getRetentionTimeInMSInDeltaQueue=3min if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; }
总结
1:eureka-client每隔30s去eureka-client拉取一次信息
2:在默认配置情况下,可能初始化的时候全量拉取,稳定后基本上都是增量拉取
3:增量拉取失败会进行全量拉取
4:eureka-server保有一个recentlyChangeQueue存储最近三分钟发生改动的微服务实例数据(定时器实现),客户端进行增量拉取就是拉取这个队列
最后
以上就是奋斗小笼包最近收集整理的关于springcloud源码之eureka-client服务发现springcloud源码之eureka-client服务发现的全部内容,更多相关springcloud源码之eureka-client服务发现springcloud源码之eureka-client服务发现内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复