在上一篇文章,我们通过 BIO 实现了一个简易的 RPC 框架,使用 BIO 的优点是编码简单,但是问题也很明显,因为是同步阻塞式 IO,所以为了实现并发处理,需要给每个连接都分配一个线程,这样势必很浪费资源,导致业务体量很容易因为硬件出现瓶颈。
本篇我们就将这个 RPC 框架的底层通信方式升级为 Netty,Netty 是对 NIO 的封装,而 NIO 是基于 IO 多路复用的设计模式,通过轮询处理各个事件,大大节约了系统资源。下面我们就来看看具体该怎么做吧
可以看到,框架的整体结构并没有改变,还是那3个包,还是那几个类。但在此之上,我还添加了一个 @RpcService 注解,去优化手动发布服务为扫描注解自动发布。
注:如果相对于 version1 没有改变的类,我会在标题后标注上“未变”,看过 version1 的同学可以直接看变化的地方。
1.RpcRequest(未变)
RpcRequest 封装了消费者要调用方法的具体信息,是我们的自定义协议,或者说是消息格式。
涉及两个过程:
- Consumer 编码:RpcRequest -> 数据流(二进制) => 告诉 Provider 要执行哪个方法
- Provider 解码:数据流(二进制)-> RpcRequest => 获取 Consumer 要调用哪个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 注:只有实现了序列化接口,才能实现远程传输 public class RpcRequest implements Serializable { private String className; // 类(接口/服务) private String methodName; // 方法 private Object[] parameters; // 参数 public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Object[] getParameters() { return parameters; } public void setParameters(Object[] parameters) { this.parameters = parameters; } }
2.RpcService(新增)
用来标识要发布的服务
1
2
3
4
5
6@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) public @interface RpcService { }
3.RpcServer(修改)*
这里不再是像 version1 的 BIO 那样通过 new ServerSocket() 创建服务端,然后再用 accept() 接受连接了,而是通过 Netty 去进行通信:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55public class RpcServer{ // 初始化主线程池,Selector NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 初始化子线程池,对应具体客户端处理逻辑 NioEventLoopGroup workerGrop = new NioEventLoopGroup(); /** * 发布服务 * 注:这里因为改造成了@RpcService自动扫描,所以不再需要传入服务实例了,只用传一个端口就行 */ public void publisher(int port) throws InterruptedException { // 创建服务端 ServerBootstrap server = new ServerBootstrap(); // 无锁式串行化编程 server.group(bossGroup, workerGrop) .channel(NioServerSocketChannel.class) // 当有请求来的时候会将 Pipeline 中的所有 ChannelHandler 的走一遍 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /** * LengthFieldBasedFrameDecoder入参有5个,分别解释如下 * maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。 * lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置 * lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8) * lengthAdjustment:要添加到长度字段值的补偿值 * initialBytesToStrip:从解码帧中去除的第一个字节数 */ // 1.自定义协议(InvokerProtocol)的解码器 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // 2.对象参数类型的编解码器 pipeline.addLast("encoder",new ObjectEncoder()); pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); // 3.处理请求的自定义 Handler pipeline.addLast(new ProcessorHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // 最大SelectionKey数量 .childOption(ChannelOption.SO_KEEPALIVE, true); // 保证所有子线程是长连接,且可以重复利用 // 绑定端口,并阻塞 ChannelFuture future = server.bind(port).sync(); // 正式启动服务,相当于用一个死循环开始轮训 future.channel().closeFuture().sync(); } }
4.ProcessorHandler(修改)*
Provider 线程的具体调用逻辑:
- 扫描指定包下的所有所有 Java 文件,获取它们的全类名
- 将标有 @RpcService 的服务实现创建实例对象,然后放入map中保存(单例)
- 等有请求来到后获取到相应 Method,然后执行方法,写出结果
注:由于在上面已经配置好了编解码器,所以可以直接获取到 RpcRequest 对象,然后在写出时编码器会对内容编码成二进制流;但 BIO 这里就需要 ObjectInputStream 和 ObjectOutputStream 手动编码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78public class ProcessorHandler extends SimpleChannelInboundHandler<RpcRequest> { // 扫描目标包后所有 Java 文件的全类名 private List<String> classNames = new ArrayList<>(); // 有 @RpcService 注解的服务bean的(serviceName,instance) private Map<String, Object> registryMap = new ConcurrentHashMap<>(); public ProcessorHandler() { try { scannerClass("com.xupt.yzh.provider"); doRegistry(); } catch (Exception e) { e.printStackTrace(); } } private void scannerClass(String packageName) { // 根据包名获取绝对路径 URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\.", "/")); File classpath = new File(url.getFile()); for (File file : classpath.listFiles()) { if (file.isDirectory()) { scannerClass(packageName + "." + file.getName()); } else { // 将扫描到的类的全类名放入 classNames classNames.add(packageName + "." + file.getName().replace(".class", "")); } } } private void doRegistry() throws Exception { if (classNames.isEmpty()) { return; } for (String className : classNames) { Class<?> clazz = Class.forName(className); RpcService annotation = clazz.getAnnotation(RpcService.class); if (annotation != null) { // 注:如果实现了多个接口,只取第一个 Class<?> i = clazz.getInterfaces()[0]; String serviceName = i.getName(); // 注册 registryMap.put(serviceName, clazz.newInstance()); } } } // Consumer 发送请求过来时触发回调 @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { if (registryMap.containsKey(request.getClassName())) { Object service = registryMap.get(request.getClassName()); // 根据实参获取方法的形参 // 注:获取形参列表后才能确定一个方法 Object[] args = request.getParameters(); Class<?>[] types = new Class[args.length]; for (int i = 0; i < args.length; i++) { types[i] = args[i].getClass(); } // 获取 Method Method method = service.getClass().getMethod(request.getMethodName(), types); // 执行方法 Object res = method.invoke(service, args); ctx.writeAndFlush(res); ctx.close(); } } }
5.RpcProxyClient(未变)
代理对象,通过 JDK 动态代理生成一个代理对象
PS:这里创建一个代理对象是因为,服务的实现实例在 Provider,但 Consumer 调用服务的具体方法时也需要一个实例,而 Consumer 并没有这个实例。
1
2
3
4
5
6
7
8
9public class RpcProxyClient { public <T>T clientProxy(final Class<T> interfaceCls, final String host, final int port) { return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class[]{interfaceCls}, new RemoteInvocationHandler(host, port)); } }
6.RemoteInvocationHandler(未变)
代理对象的具体逻辑,核心是 invoke 方法,当 Consumer 调用了服务的方法时,就会走到 invoke():
- 构建请求信息 RpcRequest
- 发送(编码):将 RpcRequest 转换成二进制流,发送
- 等待 Provider 处理结果
- 接收(解码):接收 Provider 的处理结果,将二进制流转换成基本类型/Java对象,并返回给上层函数
注:234步的逻辑都是网络 IO 相关,所以后面单独封装了一个 RpcNetTransport 类去实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class RemoteInvocationHandler implements InvocationHandler { String host; int port; public RemoteInvocationHandler(String host, int port) { this.host = host; this.port = port; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 构建调用Provider的请求参数 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); // rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); // 进行远程调用,并返回执行结果 RpcNetTransport netTransport = new RpcNetTransport(host, port); Object res = netTransport.send(rpcRequest); return res; } }
7.RpcNetTransport(修改)*
将原来的 BIO 网络通信修改成 Netty 代码,总体与 RpcServer 的代码有点类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66public class RpcNetTransport { String host; int port; public RpcNetTransport(String host, int port) { this.host = host; this.port = port; } public Object send(RpcRequest rpcRequest) throws InterruptedException { NioEventLoopGroup wokergroup = new NioEventLoopGroup(); TransportHandler transportHandler = new TransportHandler(); // 创建客户端 Bootstrap client = new Bootstrap(); client.group(wokergroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 1.自定义协议(InvokerProtocol)的解码器 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast(new LengthFieldPrepender(4)); // 2.对象参数类型的编解码器 pipeline.addLast("encoder",new ObjectEncoder()); pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, // 3.接收 Provider 处理结果的自定义 Channel Handler pipeline.addLast(transportHandler); } }); // 建立连接 ChannelFuture future = client.connect(host, port).sync(); // 发送请求信息 future.channel().writeAndFlush(rpcRequest); future.channel().closeFuture().sync(); // 返回请求信息 return transportHandler.getResponse(); } // 内部类 static class TransportHandler extends ChannelInboundHandlerAdapter { private Object response; // 接收到消息时触发该回调 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response = msg; } public Object getResponse() { return response; } } }
好了,到此我们对 BIO 到 Netty 的改造就完成了,由于我们只是做了底层实现的优化,所以测试代码并不用改变,这也是实际开发中项目迭代的一个基本要求,要兼容老版本。
结果测试(未变)
api
1
2
3
4
5public interface TestService { String test(String name); }
Provider
服务实现类:
1
2
3
4
5
6
7
8
9
10
11
12
13@RpcService public class TestServiceImpl implements TestService { @Override public String test(String name) { System.out.println("new requst coming..." + name); Random random = new Random(); String json = "{"name":" + """ + name + """ + ", "age":" + random.nextInt(40) + "}"; return json; } }
发布服务:
1
2
3
4
5
6
7
8
9
10public class Provider { public static void main(String[] args) throws InterruptedException { RpcServer proxyServer = new RpcServer(); // 注:这里不用再传入服务实例 proxyServer.publisher(8080); } }
Consumer
远程调用服务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class Consumer { public static void main(String[] args) { // 创建代理对象 RpcProxyClient rpcProxyClient = new RpcProxyClient(); // 创建一个代理对象 TestService service = rpcProxyClient.clientProxy(TestService.class, "localhost", 8080); // test() 会进行远程调用 String json = service.test("老五"); System.out.println(json); } }
结果如下:
完整代码我放到 GitHub 上了,有需要参考的同学点击这里跳转…
最后
以上就是虚心项链最近收集整理的关于【RPC】手写简易 RPC 框架 --重构,实现基于 Netty 通信的全部内容,更多相关【RPC】手写简易内容请搜索靠谱客的其他文章。
发表评论 取消回复