<!-- mqtt依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
-- application.yml配置
mqtt:
client:
url: tcp://ip:port
username: admin
password: public
topic: /test_topic/#
completionTimeout: 3000
clientId: 123456
enabled: true
qos: 0
-- 创建mq连接信息实体
@Component
@Data
public class MqttBase {
@Value("${spring.mqtt.client.url}")
private String mqttHost;
@Value("${spring.mqtt.client.username}")
private String mqttUserName;
@Value("${spring.mqtt.client.password}")
private String mqttPwd;
@Value("${spring.mqtt.client.topic}")
private String topic;
@Value("${spring.mqtt.client.completionTimeout}")
private Integer completionTimeout;
@Value("${spring.mqtt.client.qos}")
private Integer qos;
@Value("${spring.mqtt.client.clientId}")
private String clientId;
}
-- mq工具类
@Configuration
@Slf4j
public class MqttConfig {
@Autowired
private MqttBase mqttBase;
/*****
* 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用SSL验证,也在这里设置。
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[]{mqttBase.getMqttHost()});
options.setUserName(mqttBase.getMqttUserName());
options.setPassword(mqttBase.getMqttPwd().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttBase.getClientId() + System.currentTimeMillis(),
mqttClientFactory(), mqttBase.getTopic());
adapter.setCompletionTimeout(mqttBase.getCompletionTimeout());
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(mqttBase.getQos());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
//ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return message -> {
String payload = message.getPayload().toString();
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
// 根据topic分别进行消息处理。
log.info(topic + "主题,消息为{}",payload);
switch (topic) {
case YUAN_AN_ELECTRIC_TOPIC:
break;
case YUAN_AN_ENVIRONMENT_TOPIC:
break;
default:
log.info(topic + ": 未处理消息" + payload);
}
};
}
// 发送消息
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/*****
* 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler outbound() {
// 在这里进行mqttOutboundChannel的相关设置
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(mqttBase.getClientId() + System.currentTimeMillis(), mqttClientFactory());
messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
// 定义重载方法,用于消息发送
void sendToMqtt(String payload);
// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
// 指定topic和qos进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
}
最后
以上就是疯狂墨镜最近收集整理的关于Mqtt消息发送、主题订阅的全部内容,更多相关Mqtt消息发送、主题订阅内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复