kafka手动提交
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36spring: kafka: track_topic: track.data.receive bootstrap-servers: localhost:9092 listener: # 主题不存在不报错 missing-topics-fatal: false # 并发消费 concurrency: 3 # 手动提交 ack-mode: manual_immediate consumer: group-id: XXX enable-auto-commit: false # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # 自动提交时间,时间过长,可能存在重复消费问题 # auto-commit-interval: 2000 # bootstrap-servers: localhost:9092 # key/value的反序列化 key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: # 重试次数 retries: 0 # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288 # bootstrap-servers: localhost:9092 # key/value的序列化 key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13@KafkaListener(topics = "${spring.kafka.track_topic}") public void getMessage(String messageJsonString, Acknowledgment ack) { Message message = JSONObject.parseObject(messageJsonString, Message.class); boolean b = deviceTrackService.saveMessage(message); if (b) { log.info("SELF-消费成功"); ack.acknowledge(); } else { log.error("消费失败:"+message); } } }
最后
以上就是爱听歌滑板最近收集整理的关于kafka手动提交的全部内容,更多相关kafka手动提交内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复