Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现
- 背景
- 开搞
- 项目pom.xml
- 调用处理方法
- 自定义注解(类~@FeignClient)
- 使用注解定义服务调用方法
- 编写动态代理
- 创建代理接口
- 编写代理实现
- 测试
- 自动注入获取业务接口实例
- 方法调用
- 欢迎留言指正、讨论
经验总结,如有错误请指正!
背景
Spring Gateway v3.0.1为响应式框架,但是在微服务中避免不了微服务之间相互调用,但微服务大部分都是Spring Boot开发的,属于阻塞式框架,所以这种情况就属于响应式程序调用阻塞式程序提供的接口;
OpenFeign是阻塞式的,所以没办法在Gateway中直接使用,官方文档对响应式支持这样写的;
Spring Cloud Openfeign | Reactive Support

意思就是有个 feign-reactive 的框架可以支持,这个框架是PlaytikaOSS根据OpenFeign开发的一个框架;
但是我没有使用,我想自己封装一下,然后就可以实现像OpenFeign那样调用就可以了,也是增长一下水平,响应式框架的Http客户端有WebClient,可以基于WebClient调用封装一个类似OpenFeign的小实现(参考Openfeign)
开搞
项目pom.xml
本次介绍的封装实现主要是依赖nacos、gateway、loadbalancer,如下所示,其余的按需自行增配
<!--nacos-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!--spring cloud gateway-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<!--客户端负载均衡loadbalancer-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>
<!-- loadbalancer 缓存优化 -->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
调用处理方法
先编写实际处理业务接口调用的WebClient实现方法,这里写了一个简单的工具类
@Slf4j
@Component
public class SmartWebClient {
private final WebClient smartWebClient;
// ReactorLoadBalancerExchangeFilterFunction是LoadBalance的一个负载
// 可以适配nacos的服务发现,将其注入到WebClient即可实现服务名称调用及负载
public SmartWebClient(ReactorLoadBalancerExchangeFilterFunction lbFunction) {
this.smartWebClient = WebClient.builder()
.filter(lbFunction)
// 默认请求头
.defaultHeader(WEB_CLIENT_HEADER, WEB_CLIENT_HEADER_VALUE)
.build();
}
// TODO 调用方法,响应Mono类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
public <T> Mono<T> restMono(HttpMethod httpMethod, String url,
Map<String, String> params, MultiValueMap<String, String> body,
MediaType reqMediaType, MediaType respMediaType,
Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
return smartWebClient
.method(httpMethod) // post get put ……
.uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
.accept(respMediaType) // contentType
.contentType(reqMediaType) // contentType
// 这里可以放入BodyInserters.from……多种方法,按需使用
.body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
.retrieve() // 发送请求
// 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
.onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle)
// 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
.bodyToMono(resultType);
}
// TODO 调用方法,响应Flux类型,自己可以按照需要创建实体来封装请求信息,这里仅作为例子
public <T> Flux<T> restFlux(HttpMethod httpMethod, String url,
Map<String, String> params, MultiValueMap<String, String> body,
MediaType reqMediaType, MediaType respMediaType,
Class<T> resultType, Function<ClientResponse, Mono<? extends Throwable>> unSuccessHandle) {
return smartWebClient
.method(httpMethod) // post get put ……
.uri(url, params) // 这里的params是拼接在url中的,可以用@PathVariable接收的参数
.accept(respMediaType) // contentType
.contentType(reqMediaType) // contentType
// 这里可以放入BodyInserters.from……多种方法,按需使用
.body(BodyInserters.fromFormData(body)) // 构造的参数可以用@RequestParam、@RequestBody接收
.retrieve() // 发送请求
// 第一个参数判断状态,为调用方法的条件,第二个传入lambda表达式,返回Mono<异常>
.onStatus(status -> !status.is2xxSuccessful(), unSuccessHandle)
// 结果转换,将响应转换为什么类型的结果,结果为响应式类型Mono/Flux
.bodyToFlux(resultType);
}
}
实体
@Data
public class WebClientRestInfo<T> {
/**
* 请求方式
*/
private HttpMethod httpMethod = HttpMethod.GET;
/**
* 请求地址
*/
private String url;
/**
* 请求path参数
*/
private Map<String, Object> pathVariable;
/**
* 请求表单/Body
*/
private MultiValueMap<String, Object> formValues;
/**
* 编码
*/
private MediaType reqMediaType = MediaType.APPLICATION_JSON;
private MediaType respMediaType = MediaType.APPLICATION_JSON;
/**
* 是否为Flux
*/
private boolean isFlux = false;
/**
* 返回值类型
*/
private Class<T> resultType;
}
自定义注解(类~@FeignClient)
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface ReactiveFeignClient {
String name();
String path();
Class<?> fallback() default void.class;
}
使用注解定义服务调用方法
和Openfeign一样,从其他服务的controller参考创建服务接口即可,实体在服务内创建即可,但返回结果需要加上Mono(单个结果)或者Flux(多个结果),这是响应式编程必要的
@ReactiveFeignClient(name = "/order-service", path = "/test")
public interface OrderService {
@PostMapping(value = "/getOrder")
Mono<OrderInfo> getOrder(@RequestParam String orderCode);
}
编写动态代理
根据注解所在类,使用反射获取信息,调用业务处理方法
这里使用JDK代理方式
创建代理接口
public interface ReactiveFeignClientProxy {
// 因为WebClient调用需要负载,所以需要将负载通过自动注入传进来
<T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction);
}
编写代理实现
@Slf4j
public class ReactiveFeignClientProxyCreator implements ReactiveFeignClientProxy {
/**
* 代理实例化
*
* @param type
* @param lbFunction 需要传入LoadBalance负载均衡实例
* @param <T>
* @return
*/
@Override
public <T> T createProxyObject(Class<T> type, ReactorLoadBalancerExchangeFilterFunction lbFunction) {
log.info("ReactiveFeignClientProxyCreator.createProxy: {}", type);
// 构建WebClient客户端
SmartWebClient smartWebClient = new SmartWebClient(lbFunction);
// 获取注解信息
ReactiveFeignClient annotationClass = type.getAnnotation(ReactiveFeignClient.class);
String serviceName = annotationClass.name();
String serviceBasePath = annotationClass.path();
Class fallbackClass = annotationClass.fallback();
// 创建代理方法
Object obj = Proxy.newProxyInstance(type.getClassLoader(), new Class<?>[]{type}, (proxy, method, args) -> {
// 根据方法和参数得到调用服务器信息
String url = "http://" + serviceName + serviceBasePath;
// TODO 这里我是封装了对象,多个变量信息也是一样的,参考逻辑即可
WebClientRestInfo<T> webClientRestInfo = new WebClientRestInfo<T>();
webClientRestInfo.setHttpMethod(HttpMethod.GET);
webClientRestInfo.setUrl(url);
// 得到请求URL和请求方法
contractUrlAndMethod(method, webClientRestInfo);
// 得到调用的参数和body
contractRequestParams(method, args, webClientRestInfo);
// 返回flux还是mono
// isAssignableFrom 判断类型是否是某个类的子类 instanceof 判断实例是否是某个类的子类
boolean isFlux = method.getReturnType().isAssignableFrom(Flux.class);
webClientRestInfo.setFlux(isFlux);
// 得到返回对象的实际类型
Class<T> elementType = extractElementType(method.getGenericReturnType());
webClientRestInfo.setResultType(elementType);
// 调用rest 注意根据类型分别调用Mono和Flux两种请求处理
CorePublisher<T> webClientResult = null;
if (isFlux) {
// TODO 第二个参数为判断响应状态后的处理Function
webClientResult = smartWebClient.restFlux(webClientRestInfo, resp -> {
log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
}).onErrorResume(Exception.class, throwable -> {
// 失败时调用方法 onErrorResume可以使调用失败后按照自定义处理并继续返回响应式结果
return (Flux<T>) fallback(fallbackClass, type, method, args);
});
} else {
// 调用rest 同上
webClientResult = smartWebClient.restMono(webClientRestInfo, resp -> {
log.error("调用{}.{}方法发生错误,调用Fallback", type.getSimpleName(), method.getName());
return Mono.error(new RuntimeException(type.getSimpleName() + "." + method.getName() + "Request Fail."));
}).onErrorResume(Exception.class, throwable -> {
// 失败时调用方法
return (Mono<T>) fallback(fallbackClass, type, method, args);
});
}
// 返回调用结果
return webClientResult;
});
return (T) obj;
}
/**
* 得到缺省类型的实际类型
*
* @param genericReturnType
* @return
*/
private <T> Class<T> extractElementType(Type genericReturnType) {
Type[] actualTypeArguments = ((ParameterizedType) genericReturnType).getActualTypeArguments();
for (Type t : actualTypeArguments) {
if (t instanceof Class) {
return (Class<T>) t;
} else {
Type[] aaa = ((ParameterizedType) t).getActualTypeArguments();
return (Class<T>) ((ParameterizedType) t).getRawType();
}
}
return (Class<T>) actualTypeArguments[0];
}
/**
* 得到请求URL和请求方法
*
* @param method
* @param webClientRestInfo
*/
private void contractUrlAndMethod(Method method, WebClientRestInfo webClientRestInfo) {
String url = webClientRestInfo.getUrl();
Annotation[] annotationsMethod = method.getAnnotations();
for (Annotation annotation : annotationsMethod) {
// GET
if (annotation instanceof GetMapping) {
GetMapping a = (GetMapping) annotation;
url += a.value()[0];
webClientRestInfo.setHttpMethod(HttpMethod.GET);
}
// POST
else if (annotation instanceof PostMapping) {
PostMapping a = (PostMapping) annotation;
url += a.value()[0];
webClientRestInfo.setHttpMethod(HttpMethod.POST);
}
// DELETE
else if (annotation instanceof DeleteMapping) {
DeleteMapping a = (DeleteMapping) annotation;
url += a.value()[0];
webClientRestInfo.setHttpMethod(HttpMethod.DELETE);
}
// PUT
else if (annotation instanceof PutMapping) {
PutMapping a = (PutMapping) annotation;
url += a.value()[0];
webClientRestInfo.setHttpMethod(HttpMethod.PUT);
}
}
webClientRestInfo.setUrl(url);
}
/**
* 得到调用的参数和body
*
* @param method
* @param args
* @param webClientRestInfo
*/
private void contractRequestParams(Method method, Object[] args, WebClientRestInfo webClientRestInfo) {
// 参数和值对应的map
Map<String, Object> params = new LinkedHashMap<>();
MultiValueMap<String, Object> formValue = new LinkedMultiValueMap<>();
// 得到调用的参数和body
Parameter[] parameters = method.getParameters();
for (int i = 0; i < parameters.length; i++) {
// 是否带 @PathVariable注解
PathVariable annoPath = parameters[i].getAnnotation(PathVariable.class);
if (annoPath != null) {
params.put(annoPath.value(), args[i]);
}
// 是否带了 RequestParam
RequestParam annoParam = parameters[i].getAnnotation(RequestParam.class);
// 是否带了 RequestBody
RequestBody annoBody = parameters[i].getAnnotation(RequestBody.class);
if (annoParam != null || annoBody != null) {
formValue.add(parameters[i].getName(), args[i]);
}
}
webClientRestInfo.setPathVariable(params);
webClientRestInfo.setFormValues(formValue);
}
/**
* 调用fallback方法
*
* @param fallbackClass
* @param proxyType
* @param method
* @param args
* @param <T>
* @return
*/
private <T> Object fallback(Class fallbackClass, Class<T> proxyType,
Method method, Object[] args) {
// 失败时调用方法
try {
return fallbackClass.getMethod(
method.getName(),
method.getParameterTypes()
).invoke(fallbackClass.newInstance(), args);
} catch (NoSuchMethodException e) {
log.error("未找到{}.{}方法的Fallback", proxyType.getSimpleName(), method.getName());
} catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
log.error("实例化{}的FallbackClass失败", proxyType.getSimpleName());
}
return Mono.empty();
}
}
测试
可用通过自动注入,调用代理获取实例
自动注入获取业务接口实例
private OrderService orderService;
@Resource
public void setOrderService (ReactorLoadBalancerExchangeFilterFunction lbFunction) {
this.orderService = new ReactiveFeignClientProxyCreator().createProxyObject(OrderService.class, lbFunction);
}
方法调用
像Openfeign一样使用,就会根据自定义注解声明的服务名称去nacos中找到服务地址,并通过LoadBalance负载调用服务,然后根据注解声明的接口信息调用的Controller接口
// 响应式线程处理
Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
// 返回结果或者订阅响应处理
orderInfo.subscribe(res -> {
//TODO
})
// 阻塞式线程处理(不推荐)
Mono<OrderInfo> orderInfo = orderService.getOrder("order-code");
// block可以阻塞等待获取结果
OrderInfo orderInfo = orderInfo.block();
欢迎留言指正、讨论
最后
以上就是懵懂可乐最近收集整理的关于Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign的Reactive实现背景开搞测试欢迎留言指正、讨论的全部内容,更多相关Reactive响应式WebClient负载调用微服务接口封装,自行编写类OpenFeign内容请搜索靠谱客的其他文章。
发表评论 取消回复