我是靠谱客的博主 和谐马里奥,这篇文章主要介绍Spring Boot异步消息之AMQP讲解及实战(附源码)使用RabbitMQ实现发布/订阅异步消息模式,现在分享给大家,希望可以做个参考。

觉得有帮助请点赞关注收藏~~~

AMQP(高级消息队列协议)是一个提供统一消息服务的应用层标准高级消息队列协议。是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可 传递消息,并不受客户端/中间件的不同产品,不同开发语言等条件的限制。

下面实现主要用RabbitMQ讲解AMQP实例,因此需要事先安装RabbitMQ和erlang语言

erlang下载地址 https://www.erlang.org/downloads

RabbitMQ下载地址 https://www.rabbitmq.com/download.html 

使用RabbitMQ实现发布/订阅异步消息模式

1:创建发布者应用ch8_2Sender

2:在pom.xml文件中添加依赖

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
<?xml version="1.0" encoding="UTF-8"?> -<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"> <modelVersion>4.0.0</modelVersion> -<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ch</groupId> <artifactId>ch8_2Sender</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ch8_2Sender</name> <description>Demo project for Spring Boot</description> +<properties> -<dependencies> -<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> +<dependency> -<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> -<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> -<build> -<plugins> -<plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

 3:创建Weather实体类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.ch.ch8_2Sender.entity; import java.io.Serializable; public class Weather implements Serializable{ private static final long serialVersionUID = -8221467966772683998L; private String id; private String city; private String weatherDetail; public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getWeatherDetail() { return weatherDetail; } public void setWeatherDetail(String weatherDetail) { this.weatherDetail = weatherDetail; } public String getId() { return id; } public void setId(String id) { this.id = id; } @Override public String toString() { return "Weather [id=" + id + ", city=" + city + ", weatherDetail=" + weatherDetail + "]"; } }

4:重写Ch82SenderApplication主类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.ch.ch8_2Sender; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.ch.ch8_2Sender.entity.Weather; import com.fasterxml.jackson.databind.ObjectMapper; @SpringBootApplication public class Ch82SenderApplication implements CommandLineRunner{ @Autowired private ObjectMapper objectMapper; @Autowired RabbitTemplate rabbitTemplate; public static void main(String[] args) { SpringApplication.run(Ch82SenderApplication.class, args); } /** * 定义发布者 */ @Override public void run(String... args) throws Exception { //定义消息对象 Weather weather = new Weather(); weather.setId("010"); weather.setCity("北京"); weather.setWeatherDetail("今天晴到多云,南风5-6级,温度19-26°C"); //指定Json转换器,Jackson2JsonMessageConverter默认将消息转换成byte[]类型的消息 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); //objectMapper将weather对象转换为JSON字节数组 Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(weather)) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build(); // 消息唯一ID CorrelationData correlationData = new CorrelationData(weather.getId()); //使用已封装好的convertAndSend(String exchange , String routingKey , Object message, CorrelationData correlationData) //将特定的路由key发送消息到指定的交换机 rabbitTemplate.convertAndSend( "weather-exchange", //分发消息的交换机名称 "weather.message", //用来匹配消息的路由Key msg, //消息体 correlationData); } }

5:创建订阅者应用ch8_2Receiver-1

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.ch.ch8_2Receiver1; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; /** * 定义订阅者Receiver1 */ @Component public class Receiver1 { @Autowired private ObjectMapper objectMapper; @RabbitListener( bindings = @QueueBinding( //队列名weather-queue1保证和别的订阅者不一样 value = @Queue(value = "weather-queue1",durable = "true"), //weather-exchange与发布者的交换机名相同 exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"), //weather.message与发布者的消息的路由Key相同 key = "weather.message" ) ) @RabbitHandler public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{ System.out.println("-----------订阅者Receiver1接收到消息--------"); //将JSON字节数组转换为Weather对象 Weather w=objectMapper.readValue(weatherMessage, Weather.class); System.out.println("Receiver1收到的消息内容:"+w); } }

6:创建订阅者应用ch8_2Receiver-2

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.ch.ch8_2Receiver1; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.ObjectMapper; /** * 定义订阅者Receiver2 */ @Component public class Receiver2 { @Autowired private ObjectMapper objectMapper; @RabbitListener( bindings = @QueueBinding( //队列名weather-queue2保证和别的订阅者不一样 value = @Queue(value = "weather-queue2",durable = "true"), //weather-exchange与发布者的交换机名相同 exchange = @Exchange(value = "weather-exchange",durable = "true",type = "topic"), //weather.message与发布者的消息的路由Key相同 key = "weather.message" ) ) @RabbitHandler public void receiveWeather(@Payload byte[] weatherMessage)throws Exception{ System.out.println("-----------订阅者Receiver2接收到消息--------"); Weather w=objectMapper.readValue(weatherMessage, Weather.class); //将JSON字节数组转换为Weather对象 System.out.println("Receiver2收到的消息内容:"+w); } }

接下来分别运行发布者和订阅者的主类即可,发现一个发布者发布的消息可以被多个订阅者订阅。

最后

以上就是和谐马里奥最近收集整理的关于Spring Boot异步消息之AMQP讲解及实战(附源码)使用RabbitMQ实现发布/订阅异步消息模式的全部内容,更多相关Spring内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(128)

评论列表共有 0 条评论

立即
投稿
返回
顶部