我是靠谱客的博主 俊逸自行车,这篇文章主要介绍pulsar的客户端权限控制功能(一),现在分享给大家,希望可以做个参考。

1. 概述

最近在研究pulsar的权限验证部分,权限验证包含两部分:

  • 客户端连接
  • 客户端的访问控制

pulsar提供了authentication和authorization两种方式实现上述两个功能,每一种方式提供了接口。

2. 版本

pulsar 2.8.0

3. 客户端连接验证

默认情况下,pulsar是不会开启连接验证的,即客户端到broker之间、broker到broker之间的访问都没有任何限制。但是在线上环境中,对于权限的控制往往是很重要的。

3.1 在broker.conf文件中开启客户端连接认证

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Enable authentication authenticationEnabled=true # Authentication provider name list, which is comma separated list of class names # 可以提供N个处理验证的处理类,然后broker接收到客户端连接后就会调用此类的方法进行处理 authenticationProviders=auth.server.VVAuthenticationProvider # Interval of time for checking for expired authentication credentials authenticationRefreshCheckSeconds=60 # Authentication settings of the broker itself. Used when the broker connects to other brokers, # either in same or other clusters brokerClientTlsEnabled=false # 这里是broker之间连接时,broker客户端用到的处理类。通常可以和客户端的认证处理类一样。 # 正式项目中,需要在broker端判断是哪一种连接,分别做好权限认证。 brokerClientAuthenticationPlugin=auth.client.VVAuthentication brokerClientAuthenticationParameters= brokerClientTrustCertsFilePath=

3.2 实现连接验证处理类

权限验证分为客户端和服务端,服务端即broker,客户端即我们自己编写的producer或者consumer。服务端需要实现org.apache.pulsar.broker.authentication.AuthenticationProvider接口,客户端需要实现org.apache.pulsar.client.api.Authentication
下面分别给出服务端和客户端的代码。

服务端认证代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package auth.server; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.naming.AuthenticationException; import java.io.IOException; import java.util.Set; /** * @author cc * @function * @date 2021/7/27 14:19 */ public class VVAuthenticationProvider implements AuthenticationProvider { private static final Logger log = LoggerFactory.getLogger(VVAuthenticationProvider.class); private static final String methodName = "vv_auth_v2"; private String header = "vv_auth"; @Override public void initialize(ServiceConfiguration config) throws IOException { log.info(methodName + " initialize"); if (config == null) { return; } Set<String> superRoles = config.getSuperUserRoles(); if (superRoles == null) { return; } for (String role : superRoles) { log.info(methodName + " initialize " + role); } } @Override public String getAuthMethodName() { log.info(methodName + " getAuthMethodName"); return methodName; } @Override public String authenticate(AuthenticationDataSource authData) throws AuthenticationException { log.info(methodName + " authenticate"); String roleToken = "unknown"; if (authData.hasDataFromCommand()) { roleToken = authData.getCommandData(); } else if (authData.hasDataFromHttp()) { roleToken = authData.getHttpHeader(header); } else { throw new AuthenticationException("Authentication data source does not have a role token"); } log.info(methodName + " authenticate " + roleToken); return roleToken; } @Override public void close() throws IOException { log.info(methodName + " close"); } }
客户端连接认证代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package auth.client; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.PulsarClientException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; /** * @author cc * @function * @date 2021/7/27 14:00 */ public class VVAuthentication implements Authentication { private static final Logger log = LoggerFactory.getLogger(VVAuthentication.class); private static final String methodName = "vv_auth_v2"; @Override public String getAuthMethodName() { log.info(methodName + " getAuthMethodName"); return methodName; } @Override public AuthenticationDataProvider getAuthData() throws PulsarClientException { log.info(methodName + " getAuthData"); return new VVAuthenticationDataProvider(); } @Override public void configure(Map<String, String> authParams) { log.info(methodName + " configure"); if (authParams == null) { return; } authParams.forEach((key, value) -> { log.info(methodName + " configure " + key + "=" + value); }); } @Override public void start() throws PulsarClientException { log.info(methodName + " start"); } @Override public void close() throws IOException { log.info(methodName + " close"); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package auth.client; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * @author cc * @function * @date 2021/7/27 14:08 */ public class VVAuthenticationDataProvider implements AuthenticationDataProvider { private static final Logger log = LoggerFactory.getLogger(VVAuthenticationDataProvider.class); private static final String methodName = "vv_auth_v2"; private String header = "vv_auth"; private String token = "vv-role"; @Override public boolean hasDataForHttp() { log.info(methodName + " hasDataForHttp"); return true; } @Override public Set<Map.Entry<String, String>> getHttpHeaders() throws Exception { log.info(methodName + " getHttpHeaders"); Map<String, String> headers = new HashMap<>(); headers.put(header, token); return headers.entrySet(); } @Override public boolean hasDataFromCommand() { log.info(methodName + " hasDataFromCommand"); return true; } @Override public String getCommandData() { log.info(methodName + " getCommandData"); return token; } }

编写好代码后用maven打包,然后放到pulsar的lib下,重启broker组件即可。

4. 测试

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package auth; import auth.client.VVAuthentication; import org.apache.pulsar.client.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author cc * @function * @date 2021/7/19 10:47 */ public class AuthTest { private static final Logger log = LoggerFactory.getLogger(AuthTest.class); public static void main(String[] args) throws Exception { AuthTest main = new AuthTest(); main.run(); } String pulsarUrl = "pulsar://x.x.x.x:6650"; String topic = "persistent://tenant_vv/ns1/auth_test"; Authentication authentication = new VVAuthentication(); void run() throws Exception { PulsarClient client = PulsarClient.builder() .authentication(authentication) .serviceUrl(pulsarUrl) .build(); send(client); consume(client); System.out.println("connect successed "); client.close(); } void consume(PulsarClient client) throws Exception { Consumer consumer = client.newConsumer() .topic(topic) .subscriptionName("consumer-test") .subscribe(); while (true) { Message m = consumer.receive(); if (m != null) { log.info("recv " + new String(m.getData())); consumer.acknowledge(m); } else { break; } } } void send(PulsarClient client) throws Exception { Producer p = client.newProducer() .topic(topic) .create(); for (int i=0; i<10; i++) { p.newMessage().key("aaa").value(("hello " + i).getBytes()).send(); log.info("send " + i); Thread.sleep(1000); } p.flush(); p.close(); System.out.println("send done"); } }

最后

以上就是俊逸自行车最近收集整理的关于pulsar的客户端权限控制功能(一)的全部内容,更多相关pulsar内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(102)

评论列表共有 0 条评论

立即
投稿
返回
顶部