我是靠谱客的博主 无情含羞草,这篇文章主要介绍springboot与kafka的实例,现在分享给大家,希望可以做个参考。

一、依赖

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.citydo</groupId> <artifactId>kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafka</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

二、配置

复制代码
1
2
3
4
5
6
7
8
9
10
server.port=8999 spring.kafka.bootstrapServers=192.168.0.195:9092 spring.kafka.consumer.groupId=Group spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.valueDserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.groupId=Group spring.kafka.producer.keyDeserializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.valueDserializer=org.apache.kafka.common.serialization.StringSerializer

三、参数

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.citydo.kafka; import lombok.Getter; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString public class PayMessage { private String orderCode; private Float fee; private Long sendTime; }

四、消费者

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.citydo.kafka; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service @Slf4j public class MessageConsumer { public static final String PAY_TOPIC = "payTopic"; private Gson gson = new GsonBuilder().create(); @KafkaListener(topics = PAY_TOPIC) public void onMessage(String payMessage) { PayMessage msg = gson.fromJson(payMessage, PayMessage.class); log.info("msg"+msg); } }

五、生产者

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.citydo.kafka; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageProducer { public static final String PAY_TOPIC = "payTopic"; @Autowired private KafkaTemplate kafkaTemplate; private Gson gson = new GsonBuilder().create(); public void send(PayMessage payMessage) { String msg = gson.toJson(payMessage); kafkaTemplate.send(PAY_TOPIC, msg); log.info("msg"+msg); } }

六、启动

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.citydo.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ApplicationContext; import java.util.UUID; @SpringBootApplication @Slf4j public class KafkaApplication { public static void main(String[] args) { ApplicationContext applicationContext = SpringApplication.run(KafkaApplication.class, args); //发送消息 MessageProducer producer = applicationContext.getBean(MessageProducer.class); while (true){ PayMessage message = new PayMessage(); message.setFee((float) System.currentTimeMillis()); message.setOrderCode(UUID.randomUUID().toString()); message.setSendTime(System.currentTimeMillis()); producer.send(message); try { Thread.sleep(2000); } catch (InterruptedException e) { log.info("{}",e.getMessage()); } } } }

最后

以上就是无情含羞草最近收集整理的关于springboot与kafka的实例的全部内容,更多相关springboot与kafka内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(103)

评论列表共有 0 条评论

立即
投稿
返回
顶部