在学习了Dubbo之后, 我发现自己好像了解了Dubbo的实现原理, 又好像不是很了解, 毕竟我只是背诵了下概念, 没有深入的去看源码. 这里我就来手写一个简化版的Dubbo框架, 通过动手实践来深入理解Dubbo的实现原理.
Dubbo的实现原理
RPC调用的过程
我们先来看下RPC调用的过程.
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行RPC调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
RPC调用的原理
RPC调用的原理是: 动态代理, 反射, 网络传输.
- 消费者从注册中心获取到服务提供者的地址后, 与服务提供者建立TCP连接.
- 消费者将服务的全限定类名(String), 方法名(String), 方法参数类型(Class[]), 方法参数(Object[]), 通过TCP传输给服务提供者.
- 服务提供者获取到这些数据后, 通过反射调用对应服务的方法, 然后将执行结果通过TCP返回给服务消费者.
- 整个RPC调用过程被封装到动态代理中, 对用户来说是透明的.
Dubbo架构
Dubbo框架设计分为十层:
- service 服务层, 为服务提供者和服务消费者提供接口.
- config 配置层, 提供dubbo的各种配置.
- proxy 服务接口透明代理, 生成动态代理.
- registry 注册中心层, 负责服务的注册与发现.
- cluster 路由层, 封装多个提供者的路由及负载均衡.
- monitor 监控层, RPC调用次数和调用时间监控.
- protocol 远程调用层, 封装 RPC 调用.
- exchange 信息交换层, 封装请求响应模式, 同步转异步.
- transport 网络传输层, 抽象 mina 和 netty 为统一接口.
- serialize 数据序列化层, 提供数据序列化的接口.
手写简化版的Bubbo框架
我们根据Dubbo的框架设计来手写一个简化版的Dubbo, 其中序列化协议使用Java原生的Serializable, 网络传输协议使用原生的TCP, 负载均衡使用随机算法, 注册中心使用ZooKeeper, 动态代理使用JDK Proxy.
github地址: 手写一个简化版的Dubbo框架
服务提供者
(1) ZooKeeper常量
定义了ZooKeeper的地址和Dubbo注册中心的根节点路径.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/** * @author litianxiang * @date 2020/3/17 11:45 */ public class ZooKeeperConst { /** * ZooKeeper的地址 */ public static String host = "xxx.xx.xx.xxx:2181"; /** * Dubbo在ZooKeeper上的根节点 */ public static String rootNode = "/dubbo"; }
(2) 注册中心
这里使用ZooKeeper来实现注册中心, 将服务及服务提供者地址注册到注册中心.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60/** * @author litianxiang * @date 2020/3/17 11:28 */ public class RegisterCenter { private static Logger logger = LoggerFactory.getLogger(RegisterCenter.class); private ZooKeeper zk; /** * 连接ZooKeeper, 创建dubbo根节点 */ public RegisterCenter() { try { CountDownLatch connectedSignal = new CountDownLatch(1); zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); //因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例. connectedSignal.await(); //创建dubbo注册中心的根节点(持久节点) if (zk.exists(ZooKeeperConst.rootNode, false) == null) { zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("connect zookeeper server error.", e); } } /** * 将服务和服务提供者URL注册到注册中心 * @param serviceName 服务名称 * @param serviceProviderAddr 服务所在TCP地址 */ public void register(String serviceName, String serviceProviderAddr) { try { //创建服务节点 String servicePath = ZooKeeperConst.rootNode + "/" + serviceName; if (zk.exists(servicePath, false) == null) { zk.create(servicePath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //创建服务提供者节点 String serviceProviderPath = servicePath + "/" + serviceProviderAddr; if (zk.exists(serviceProviderPath, false) == null) { zk.create(serviceProviderPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } logger.info("服务注册成功, 服务路径: " + serviceProviderPath); } catch (Exception e) { logger.error("注册中心-注册服务报错", e); } } }
(3) 接口全限定类名, 方法名, 方法参数类型, 方法参数的包装类
这里为了简单, 使用Java自带的序列化协议.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64/** * 封装接口名, 方法名, 参数字节码数组, 参数对象 */ public class Invocation implements Serializable { private static final long serialVersionUID = -2798340582119604989L; /** * 接口名 */ private String interfaceName; /** * 方法名 */ private String methodName; /** * 参数字节码数组 */ private Class[] paramTypes; /** * 参数对象 */ private Object[] params; public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) { this.interfaceName = interfaceName; this.methodName = methodName; this.paramTypes = paramTypes; this.params = params; } public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class[] getParamTypes() { return paramTypes; } public void setParamTypes(Class[] paramTypes) { this.paramTypes = paramTypes; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } }
(4) RPC监听服务
用来监听Consumer远程调用的TCP连接, 接收到Consumer传输过来的数据后, 通过反射调用对应的方法, 然后将结果返回给Consumer. Dubbo使用的是Netty框架, 这里为了简单, 我们使用原生的TCP连接.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103/** * RPC监听服务, 监听consumer远程调用的tcp连接 * @author litianxiang * @date 2020/3/17 18:01 */ public class RpcServer { private static Logger logger = LoggerFactory.getLogger(RpcServer.class); private Map<String, Class> serviceMap; public RpcServer(Map<String, Class> serviceMap) { this.serviceMap = serviceMap; } /** * 启动RPC监听服务 */ public void start() { //监听端口, 处理rpc请求 ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(12000); logger.info("RPC监听服务启动..."); while (true) { Socket socket = serverSocket.accept(); new Thread(new ServerHandler(socket, serviceMap)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { if (serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } /** * 处理RPC, 通过反射执行方法 * @author litianxiang * @date 2020/3/6 17:52 */ public class ServerHandler implements Runnable { private Socket socket; private Map<String, Class> serviceMap; public ServerHandler(Socket socket, Map<String, Class> serviceMap) { this.socket = socket; this.serviceMap = serviceMap; } @Override public void run() { ObjectInputStream in = null; ObjectOutputStream out = null; try { in = new ObjectInputStream(socket.getInputStream()); out = new ObjectOutputStream(socket.getOutputStream()); //获取Invocation对象 Invocation invocation = (Invocation) in.readObject(); //执行对应方法 Class clazz = serviceMap.get(invocation.getInterfaceName()); Method method = clazz.getMethod(invocation.getMethodName(), invocation.getParamTypes()); Object invoke = method.invoke(clazz.newInstance(), invocation.getParams()); //返回方法执行结果 out.writeObject(invoke); out.flush(); } catch (Exception e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { try { out.close(); } catch (Exception e) { e.printStackTrace(); } } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } socket = null; } } }
(5) Provider启动类
这里会模拟dubbo的service配置, 将接口名及其对应的实现类储存到serviceMap中, 然后将服务和服务提供者地址注册到注册中心, 最后再启动对Consumer远程调用的监听.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/** * @author litianxiang * @date 2020/3/6 15:32 */ public class Provider { private static Logger logger = LoggerFactory.getLogger(Provider.class); private static Map<String, Class> serviceMap = new HashMap<>(); private static String tcpHost = "127.0.0.1:12000"; static { /** * 模拟service配置处理逻辑 * <dubbo:service interface="com.client.service.IBookService" ref="bookService" /> * <bean id="bookService" class="com.provider.service.BookServiceImpl" /> */ serviceMap.put(IBookService.class.getName(), BookServiceImpl.class); } public static void main(String[] args) { //将服务和服务提供者URL注册到注册中心 RegisterCenter registerCenter = new RegisterCenter(); for (Map.Entry<String, Class> entry : serviceMap.entrySet()) { registerCenter.register(entry.getKey(), tcpHost); } //监听Consumer的远程调用(为了简化代码, 这里使用TCP代替Netty) RpcServer rpcServer = new RpcServer(serviceMap); rpcServer.start(); } }
服务消费者
(1) 负载均衡
为了简单, 这里直接使用的是随机算法.
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class RandomLoadBalance { /** * 随机一个provider * @param providerList provider列表 * @return provider */ public String doSelect(List<String> providerList) { int size = providerList.size(); Random random = new Random(); return providerList.get(random.nextInt(size)); } }
(2) 服务订阅类
服务订阅类提供向注册中心订阅服务的功能, 涉及服务发现与负载均衡.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67public class ServiceSubscribe { private static Logger logger = LoggerFactory.getLogger(ServiceSubscribe.class); private ZooKeeper zk; private List<String> providerList; /** * 连接ZooKeeper, 创建dubbo根节点 */ public ServiceSubscribe() { try { CountDownLatch connectedSignal = new CountDownLatch(1); zk = new ZooKeeper(ZooKeeperConst.host, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); //因为监听器是异步操作, 要保证监听器操作先完成, 即要确保先连接上ZooKeeper再返回实例. connectedSignal.await(); //创建dubbo注册中心的根节点(持久节点) if (zk.exists(ZooKeeperConst.rootNode, false) == null) { zk.create(ZooKeeperConst.rootNode, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { logger.error("connect zookeeper server error.", e); } } /** * 在注册中心订阅服务, 返回对应的服务url * 只要第一次获取到了服务的RPC地址, 后面注册中心挂掉之后, 仍然可以继续通信. * @param serviceName 服务名称 * @return 服务host */ public String subscribe(String serviceName) { //服务节点路径 String servicePath = ZooKeeperConst.rootNode + "/" + serviceName; try { //获取服务节点下的所有子节点, 即服务的RPC地址 providerList = zk.getChildren(servicePath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { try { //循环监听 providerList = zk.getChildren(servicePath, true); } catch (KeeperException | InterruptedException e) { logger.error("Consumer在ZooKeeper订阅服务-注册监听器报错", e); } } } }); } catch (Exception e) { logger.error("从注册中心获取服务报错.", e); } logger.info(serviceName + "的服务提供者列表: " + providerList); //负载均衡 RandomLoadBalance randomLoadBalance = new RandomLoadBalance(); return randomLoadBalance.doSelect(providerList); } }
(3) RPC代理类
根据JDK Proxy生成一个代理对象, 封装RPC调用的过程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class RpcServiceProxy { private ServiceSubscribe serviceSubscribe; public RpcServiceProxy(ServiceSubscribe serviceSubscribe) { this.serviceSubscribe = serviceSubscribe; } /** * 获取RPC代理 * @param clazz * @return */ public Object getProxy(final Class clazz) { return Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //在注册中心订阅服务, 返回对应的服务url String rpcHost = serviceSubscribe.subscribe(clazz.getName()); String[] split = rpcHost.split(":"); //与远程服务建立连接 Socket socket = new Socket(split[0], Integer.parseInt(split[1])); ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); //向RPC服务传输Invocation对象 String className = clazz.getName(); String methodName = method.getName(); Class[] paramTypes = method.getParameterTypes(); Invocation invocation = new Invocation(className, methodName, paramTypes, args); out.writeObject(invocation); out.flush(); //接收方法执行结果 Object object = in.readObject(); in.close(); out.close(); socket.close(); return object; } }); } }
(4) Consumer启动类
消费者启动后, 会向注册中心订阅服务, 经过负载均衡获取到对应的服务后, 再进行RPC调用.
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); public static void main(String[] args) { //在注册中心订阅服务, 获取服务所在的url, 然后通过代理远程调用服务 ServiceSubscribe serviceSubscribe = new ServiceSubscribe(); RpcServiceProxy rpcServiceProxy = new RpcServiceProxy(serviceSubscribe); //获取RPC代理 IBookService bookService = (IBookService) rpcServiceProxy.getProxy(IBookService.class); BookDTO bookInfo = bookService.getBookInfo(1); System.out.println(bookInfo); } }
测试
(1) 先修改注册中心的地址
1
2public static String host = "xxx.xx.xx.xxx:2181";
(2) Service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public class BookDTO implements Serializable{ private static final long serialVersionUID = 1934175717377394706L; private int id; private String name; private String desc; private String author; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } @Override public String toString() { return "BookDTO{" + "id=" + id + ", name='" + name + ''' + ", desc='" + desc + ''' + ", author='" + author + ''' + '}'; } } public interface IBookService { BookDTO getBookInfo(int id); } public class BookServiceImpl implements IBookService { @Override public BookDTO getBookInfo(int id) { if (id == 1) { BookDTO bookDTO = new BookDTO(); bookDTO.setId(1); bookDTO.setName("仙逆"); bookDTO.setDesc("顺为凡, 逆为仙, 只在心中一念间."); bookDTO.setAuthor("耳根"); return bookDTO; } else { return new BookDTO(); } } }
(3) 启动Provider
(4) 启动Consumer
最后
以上就是快乐秋天最近收集整理的关于手写一个简化版的Dubbo框架的全部内容,更多相关手写一个简化版内容请搜索靠谱客的其他文章。
发表评论 取消回复