简单示例
在消息发送方给队列设置最大优先级,同时给消息设置优先级,优先级大小 0-255
1.消息发送方
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class RabbitmqProducer { public static final String ip = "192.168.*.*"; public static final int port = 5672; public static final String username = "guest"; public static final String password = "guest"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip); // 创建信道,信道是在connection上虚拟出来的连接 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 声明交换器 channel.exchangeDeclare("exchange_priority","direct",true); // 声明队列,加入最大优先级参数 x-max-priority Map<String,Object> param = new HashMap<>(); param.put("x-max-priority", 10); // 必须给队列设置优先级,否则消息设置了优先级也不生效 channel.queueDeclare("queue_priority", true, false, false, param); // 给队列绑定交换器 channel.queueBind("queue_priority", "exchange_priority", "rk_priority"); // 发送消息 for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); if(i%2!=0) builder.priority(5); AMQP.BasicProperties properties = builder.build(); // 给消息设定优先级并发送 channel.basicPublish("exchange_priority","rk_priority",properties,("messages-"+i).getBytes()); } channel.close(); connection.close(); } }
2.消息接收方
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitmqConsumer { public static final String ip = "192.168.*.*"; public static final int port = 5672; public static final String username = "guest"; public static final String password = "guest"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setPassword(password); connectionFactory.setUsername(username); connectionFactory.setPort(port); connectionFactory.setHost(ip); // 声明信道 Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); Integer priority = delivery.getProperties().getPriority(); // 优先级 System.out.println(" [x] Received '"+ message + "'"+" === priority: " + priority ); }; // 接受消息 channel.basicConsume("queue_priority", true, deliverCallback, consumerTag -> { }); } }
结合 SpringCloud Stream 使用
1.启动类添加注解
复制代码
1
2@EnableBinding({DemoPriorityBinding.class})
2.定义一个binding接口
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13public interface DemoPriorityBinding { String OUTPUT = "abc-center-demo-priority-output"; String INPUT = "abc-center-demo-priority-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }
3.bootstrap.properties配置
复制代码
1
2
3
4
5
6
7spring.cloud.stream.bindings.abc-center-demo-priority-input.destination=abc-center-demo-priority spring.cloud.stream.bindings.abc-center-demo-priority-input.group=${spring.application.name} spring.cloud.stream.bindings.abc-center-demo-priority-output.destination=abc-center-demo-priority # 这里consumer.max-priority一定要有优先级的设置,这里即是给队列设置优先级 spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-input.consumer.max-priority=100 spring.cloud.stream.rabbit.bindings.abc-center-demo-priority-output.producer.max-priority=100
4.发送消息
复制代码
1
2
3
4
5
6
7
8
9@Autowired private DemoPriorityBinding demoPriorityBinding; public void sendPriorityMsg(String msg, Integer priority) { // 这里通过setHeader可以给消息设置优先级,priority参数就是优先级的意思 demoPriorityBinding.output().send(MessageBuilder.withPayload(msg).setHeader("priority", priority).build()); log.info("消息发送成功"); }
5.接收消息
复制代码
1
2
3
4
5
6
7@StreamListener(DemoPriorityBinding.INPUT) public void acquirePriorityMsg(String msg) throws Exception{ // 这里休眠20秒,模拟业务处理,方便让消息排队 Thread.sleep(20000); log.info("==========接收优先级消息:"+msg); }
注意
检查队列是否有优先级设置,队列设置完优先级会有如图显示
最后
以上就是想人陪老虎最近收集整理的关于Rabbitmq使用优先级队列实现消息插队的全部内容,更多相关Rabbitmq使用优先级队列实现消息插队内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复