我是靠谱客的博主 碧蓝牛排,这篇文章主要介绍【RocketMQ中的顺序消息、生产者顺序生产消息、消费者顺序消费消息、顺序包括全局有序和分块有序、代码样例实战】,现在分享给大家,希望可以做个参考。

一.知识回顾

【0.RocketMQ专栏的内容在这里哟,帮你整理好了,更多内容持续更新中】
【1.Docker安装部署RocketMQ消息中间件详细教程】
【2.RocketMQ生产者发送消息的三种方式:发送同步消息、异步消息、单向消息&案例实战&详细学习流程】
【3.RocketMQ消费者进行消费消息的二种方式:集群消费、广播消费&案例实战&详细学习流程&集群消费模、广播模式的适用场景&注意事项】

二.RocketMQ中的顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

注意:生产消息时在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

2.1 全局有序

2.1.1 什么是全局有序?

如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序。
在这里插入图片描述

2.1.2 全局有序实现的关键之处

全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。

2.1.3 生产者实现代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class AsyncProducer { public static void main(String[] args) throws Exception{ // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("group_test"); // 设置NameServer的地址 producer.setNamesrvAddr("设置为自己主机IP地址:9876"); // 启动Producer实例 producer.start(); for (int i = 0; i < 10; i++) { final int index = i; // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest", "TagA", "OrderID888", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendCallback接收异步返回结果的回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%s%n", sendResult); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } Thread.sleep(10000); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }

2.1.4 消费者实现代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class BalanceComuser { public static void main(String[] args) throws Exception { // 实例化消费者,指定组名: TopicTest 10条消息 group_consumer , DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer"); // 指定Namesrv地址信息. consumer.setNamesrvAddr("IP地址:9876"); // 订阅Topic consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC //集群模式消费 consumer.setMessageModel(MessageModel.CLUSTERING); //取消 consumer.unsubscribe("TopicTest"); //再次订阅Topic即可 consumer.subscribe("TopicTest", "*"); //tag tagA|TagB|TagC // 注册回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for(MessageExt msg : msgs) { String topic = msg.getTopic(); String msgBody = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); Thread.sleep(1000); System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消息者 consumer.start(); //注销Consumer System.out.printf("Consumer Started.%n"); } }

2.1.5 结果展示

生产者发送消息结果
在这里插入图片描述
消费者收到消息结果
在这里插入图片描述

2.2 分区有序

2.2.1 什么是分区有序?

如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
在这里插入图片描述

2.2.2 分区有序实现的关键之处

在电商业务场景中,一个订单的流程是:创建、付款、推送、完成。在加入RocketMQ后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到RocketMQ中的一个主题中,这里该如何实现针对一个订单的消息顺序性呢!

在这里插入图片描述

要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。

2.2.3 生产者实现代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; /** * 部分顺序消息生产 */ public class ProducerInOrder { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("OrderProducer"); producer.setNamesrvAddr("IP地址:9876"); producer.start(); // 订单列表 List<Order> orderList = new ProducerInOrder().buildOrders(); for (int i = 0; i < orderList.size(); i++) { String body = orderList.get(i).toString(); Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes()); //msg:发送消息 //MessageQueueSelector:队列选择器 //arg:传入的参数 //producer.send(msg,MessageQueueSelector,arg); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; //根据订单id选择发送queue long index = id % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId());//订单id System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } /** * 订单类 */ private static class Order { private long orderId; private String desc; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "Order{" + "orderId=" + orderId + ", desc='" + desc + ''' + '}'; } } /** * 生成模拟订单数据 3个订单 每个订单4个状态 * 每个订单 创建->付款->推送->完成 */ private List<Order> buildOrders() { List<Order> orderList = new ArrayList<Order>(); Order orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("创建"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("付款"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(002); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("推送"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(003); orderDemo.setDesc("完成"); orderList.add(orderDemo); orderDemo = new Order(); orderDemo.setOrderId(001); orderDemo.setDesc("完成"); orderList.add(orderDemo); return orderList; } }

2.2.4 消费者实现代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; import java.util.Random; import java.util.concurrent.TimeUnit; /** * 部分顺序消息消费 */ public class ConsumerInOrder { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2"); consumer.setNamesrvAddr("IP地址:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe("PartOrder", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.MILLISECONDS.sleep(random.nextInt(300)); } catch (Exception e) { e.printStackTrace(); //这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } }

2.1.5 结果展示

生产者发送消息结果
在这里插入图片描述

消费者收到消息结果

在这里插入图片描述

好了,到这里【RocketMQ中的顺序消息、生产者顺序生产消息、消费者顺序消费消息、顺序包括全局有序和分块有序、代码样例实战】就先学习到这里,更多RocketMQ的内容不断学习,不断创作中。加油!!!

最后

以上就是碧蓝牛排最近收集整理的关于【RocketMQ中的顺序消息、生产者顺序生产消息、消费者顺序消费消息、顺序包括全局有序和分块有序、代码样例实战】的全部内容,更多相关【RocketMQ中内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(86)

评论列表共有 0 条评论

立即
投稿
返回
顶部