1、增加rabbitmq的依赖包
复制代码
1
2
3
4
5
6
7
8
9
10
11<!-- amqp 依赖包 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.2.RELEASE</version> </dependency>
2、rabbitmq.properties文件中配置
复制代码
1
2
3
4
5
6
7
8
9
10
11
12rabbit.hosts= rabbit.username=liam rabbit.password=liam rabbit.port=5672 rabbit.virtualHost=/ # 统一XML配置中易变部分的命名 rabbit.vhost=liam_host rabbit.channelCacheSize=8 rabbit.queue=rabbitmq_test2 rabbit.exchange=rabbit_driect_mq_exchange rabbit.routingKey=rabbitmq_test2 rabbit.publisher-confirms=true
复制代码
1
3、RabbitMq的工厂连接和模板创建
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36@Configuration public class RabbitConfig { @Value("${rabbit.hosts}") private String host; @Value("${rabbit.port}") private String port; @Value("${rabbit.username}") private String username; @Value("${rabbit.password}") private String password; @Value("${rabbit.publisher-confirms}") private Boolean publisherConfirms; @Value("${rabbit.vhost}") private String virtualHost; //创建工厂连接 @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(this.host); connectionFactory.setUsername(this.username); connectionFactory.setPassword(this.password); connectionFactory.setVirtualHost(this.virtualHost); connectionFactory.setPublisherConfirms(this.publisherConfirms); //必须要设置 return connectionFactory; } //rabbitmq的模板配置 @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(this.connectionFactory()); //template.setConfirmCallback(); 设置消息确认 //template.setReturnCallback(); return template; } }
4、创建交换机、创建队列、绑定交换机和队列
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47@Configuration public class RabbitExchangeConfig { //直连交换机 @Bean public DirectExchange defaultExchange() { return new DirectExchange(RabbitConstant.EXCHANGE_NAME); } //队列 @Bean public Queue queue() { return QueueBuilder.durable(RabbitConstant.QUEUE_NAME).build(); } //绑定 @Bean public Binding binding() { return BindingBuilder.bind(this.queue()).to(this.defaultExchange()).with(RabbitConstant.ROUTING_KEY); } /*@Bean public Binding binding() { return declare(new Binding(helloWorldQueue(), defaultDirectExchange())); }*/ /* @Bean public TopicExchange helloExchange() { return declare(new TopicExchange("hello.world.exchange")); }*/ /* public Queue declareUniqueQueue(String namePrefix) { Queue queue = new Queue(namePrefix + "-" + UUID.randomUUID()); rabbitAdminTemplate().declareQueue(queue); return queue; } // if the default exchange isn't configured to your liking.... @Bean Binding declareP2PBinding(Queue queue, DirectExchange exchange) { return declare(new Binding(queue, exchange, queue.getName())); } @Bean Binding declarePubSubBinding(String queuePrefix, FanoutExchange exchange) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange)); } @Bean Binding declarePubSubBinding(UniqueQueue uniqueQueue, TopicExchange exchange) { return declare(new Binding(uniqueQueue, exchange)); } @Bean Binding declarePubSubBinding(String queuePrefix, TopicExchange exchange, String routingKey) { return declare(new Binding(declareUniqueQueue(queuePrefix), exchange, routingKey)); }*/ }
5、生产者
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22@Component public class RabbitSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容 this.rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_NAME, RabbitConstant.ROUTING_KEY, content, correlationId); } //回调 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause); } } }
6、消费者
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26@Configuration public class RabbitReceive { @Autowired private RabbitConfig rabbitConfig; @Autowired private RabbitExchangeConfig rabbitExchangeConfig; @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory()); container.setQueues(rabbitExchangeConfig.queue()); //设置要监听的队列 container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 } }); return container; } }
参考:
Spring-@value用法详解
spring boot 整合 RabbitMq (注解)
最后
以上就是冷静帆布鞋最近收集整理的关于Spring整合RabbitMQ(二)之注解实现参考: Spring-@value用法详解的全部内容,更多相关Spring整合RabbitMQ(二)之注解实现参考内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复