我是靠谱客的博主 独特长颈鹿,这篇文章主要介绍【RabbitMQ】 延时队列(delayed_message_exchange插件实现),现在分享给大家,希望可以做个参考。

一、安装 rabbitmq_delayed_message_exchange 插件

  • 官方下载地址:https://www.rabbitmq.com/community-plugins.html

(1) linux下直接安装:

注意:

rabbitMQ 中必须是Disc(磁盘型)类型的节点才可以安装延时队列插件, RAM(内存型)类型节点无法安装。
image

  • 1.通过 rpm -qa|grep rabbit 命令查看当前linux是否安装RabbitMQ
复制代码
1
2
3
[root@linux]# rpm -qa|grep rabbit rabbitmq-server-3.6.5-1.noarch
  • 2.通过 rpm -ql rabbitmq-server-3.6.5-1.noarch 查找RabbitMQ安装路径
复制代码
1
2
3
4
5
6
7
8
9
10
11
[root@linux]# rpm -ql rabbitmq-server-3.6.5-1.noarch /etc/logrotate.d/rabbitmq-server /etc/rabbitmq /etc/rc.d/init.d/rabbitmq-server /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server-ha /usr/lib/rabbitmq/bin/rabbitmq-defaults /usr/lib/rabbitmq/bin/rabbitmq-env /usr/lib/rabbitmq/bin/rabbitmq-plugins ...
  • 3.先通过 rabbitmq-plugins list 查看已安装的插件列表
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
[root@linux]# rabbitmq-plugins list Configured: E = explicitly enabled; e = implicitly enabled | Status: * = running on rabbit@apec-198 |/ [e*] amqp_client 3.6.5 [ ] cowboy 1.0.3 [ ] cowlib 1.0.1 [e*] mochiweb 2.13.1 [ ] rabbitmq_amqp1_0 3.6.5 [ ] rabbitmq_auth_backend_ldap 3.6.5 [ ] rabbitmq_auth_mechanism_ssl 3.6.5 [ ] rabbitmq_consistent_hash_exchange 3.6.5 [ ] rabbitmq_event_exchange 3.6.5 [ ] rabbitmq_federation 3.6.5 [ ] rabbitmq_federation_management 3.6.5 [ ] rabbitmq_jms_topic_exchange 3.6.5 [E*] rabbitmq_management 3.6.5 [e*] rabbitmq_management_agent 3.6.5 [ ] rabbitmq_management_visualiser 3.6.5 [ ] rabbitmq_mqtt 3.6.5 [ ] rabbitmq_recent_history_exchange 1.2.1 [ ] rabbitmq_sharding 0.1.0 [ ] rabbitmq_shovel 3.6.5 [ ] rabbitmq_shovel_management 3.6.5 [ ] rabbitmq_stomp 3.6.5 [ ] rabbitmq_top 3.6.5 [ ] rabbitmq_tracing 3.6.5 [ ] rabbitmq_trust_store 3.6.5 [e*] rabbitmq_web_dispatch 3.6.5 [ ] rabbitmq_web_stomp 3.6.5 [ ] rabbitmq_web_stomp_examples 3.6.5 [ ] sockjs 0.3.4 [e*] webmachine 1.10.3
  • 4.进入 /rabbitmq/lib/rabbitmq_server-3.6.5/plugins 上传预下载好的 rabbitmq_delayed_message_exchange 插件(rabbitmq_delayed_message_exchange-20171201-3.7.x.ez)
  • 5.启动插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 6.重启RabbitMQ 使插件生效: service rabbitmq-server restart

(2) linux下 Docker 镜像中安装:

  • 1.查看当前Docker容器中的镜像,找到RabbitMQ
  • docker ps -a
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
[root@linux ~]# docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 8efd6f3add3c chenchuxin/dubbo-admin "catalina.sh run" 6 weeks ago Up 5 weeks 0.0.0.0:9090->8080/tcp dubbo-admin 6939b83d0942 zookeeper "/docker-entrypoint.…" 6 weeks ago Up 5 weeks zookeeper01 2aec2548a9f8 525bd2016729 "docker-entrypoint.s…" 6 weeks ago Up 5 weeks 0.0.0.0:27017->27017/tcp docker_mongodb a6da9a3f6ca2 mongo-express "tini -- /docker-ent…" 6 weeks ago Up 5 weeks 0.0.0.0:8081->8081/tcp agitated_tu d3dfb1bbfda4 mongo:4.0.4 "docker-entrypoint.s…" 6 weeks ago Up 5 weeks 27017/tcp mymongo 389a673177ea portainer/portainer "/portainer" 7 weeks ago Up 5 weeks 0.0.0.0:9000->9000/tcp prtainer-test 840e143489ac 752be83a5396 "/docker-entrypoint.…" 7 weeks ago Up 5 weeks 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp es2 c13f5fba3a1a mysql:5.7 "docker-entrypoint.s…" 7 weeks ago Up 4 weeks 0.0.0.0:3306->3306/tcp, 33060/tcp mysql 09512f961186 2888deb59dfc "docker-entrypoint.s…" 7 weeks ago Up 5 weeks 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbitmq3.7.7 a817c6e95c4b redis:4.0 "docker-entrypoint.s…" 7 weeks ago Up 5 weeks 0.0.0.0:7001->6379/tcp redis7001
  • 2.预先上传rabbitmq_delayed_message_exchange-20171201-3.7.x.ez插件到Linux文件夹中
  • 3.拷贝插件文件到rabbitMQ的Docker容器中
复制代码
1
2
[root@linux ~]# docker cp rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbitmq3.7.7:/plugins
  • 3.进入rabbitMQ的Docker容器中docker exec -it rabbitmq3.7.7 bash
复制代码
1
2
3
[root@linux ~]# docker exec -it rabbitmq3.7.7 bash root@myRabbit:/#
  • 4.查看插件列表 rabbitmq-plugins list
  • 5.启用插件
复制代码
1
2
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

二、接入

  • 配置x-delayed-type的交换机即可
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// config public static final String DELAYED_ROUTING_KEY = "delay.queue.job.delay.routingKey"; public static final String DELAYED_EXCHANGE_TYPE = "x-delayed-message"; @Bean public CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(2); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, DELAYED_EXCHANGE_TYPE, true, false, args); } // message body /** * Created by KINGFS on 2020/8/6. * * @author KINGFS */ @Data @EqualsAndHashCode(callSuper = false) public class DelayMessage { /** * 持久化 */ private MessageDeliveryMode deliveryMode = MessageDeliveryMode.PERSISTENT; /** * 消息超时时间 */ private String expiration; /** * 消息超时时间 */ private Integer delay; /** * 消息数据 */ private String message; } // sender /** * Created by KINGFS on 2020/8/6. * * @author KINGFS */ @Component @RequiredArgsConstructor @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class RabbitMqSender { RabbitTemplate rabbitTemplate; //------------------------------------------------------------------------------------------------------------------ public void sendDirectDelayMessage(DelayMessage delayMessage) { sendMessage(DirectDelayMqConfig.DELAY_EXCHANGE_NAME, DirectDelayMqConfig.DELAY_QUEUE_MESSAGE_TTL_ROUTING_KEY, delayMessage); } public void sendXDelayMessage(DelayMessage delayMessage) { sendMessage(DelayMessageMqConfig.DELAYED_EXCHANGE_NAME, DelayMessageMqConfig.DELAYED_ROUTING_KEY, delayMessage); } private void sendMessage(String exchangeName, String routingKey, DelayMessage message) { rabbitTemplate.convertAndSend(exchangeName, routingKey, message.getMessage(), messagePostProcessor -> { MessageProperties messageProperties = messagePostProcessor.getMessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setExpiration(message.getExpiration()); messageProperties.setDelay(message.getDelay()); return messagePostProcessor; } ); } }
  • 写的不对或者不好的地方欢迎各位斧正,万分感谢!

最后

以上就是独特长颈鹿最近收集整理的关于【RabbitMQ】 延时队列(delayed_message_exchange插件实现)的全部内容,更多相关【RabbitMQ】内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(84)

评论列表共有 0 条评论

立即
投稿
返回
顶部