我是靠谱客的博主 虚幻大树,这篇文章主要介绍flink连接kafka,postgresql sink,现在分享给大家,希望可以做个参考。

pom.xml依赖项:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc</artifactId> <version>1.6.1</version> </dependency>

配置文件Baseconf

复制代码
1
2
3
4
5
6
7
8
9
package com.conf; public class BaseConf { public static final String USERNAME = "postgres"; public static final String PASSWORD = "passwd"; public static final String DRIVERNAME = "org.postgresql.Driver"; public static final String URL = "jdbc:postgresql://192.168.108.***:5432/***"; }

写入postgresql数据库
https://blog.csdn.net/weixin_43315211/article/details/88354331

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.sink; import com.conf.BaseConf; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class PostSQLSink extends RichSinkFunction<Tuple5<String, String,String,String,String>> { private static final long serialVersionUID = 1L; private Connection connection; private PreparedStatement preparedStatement; @Override public void open(Configuration parameters) throws Exception { Class.forName(BaseConf.DRIVERNAME); connection = DriverManager.getConnection(BaseConf.URL, BaseConf.USERNAME, BaseConf.PASSWORD); String sql = "insert into public.log_info(ip,time,courseid,status_code,referer) values (?,?,?,?,?)"; preparedStatement = connection.prepareStatement(sql); super.open(parameters); } @Override public void invoke(Tuple5<String, String, String, String, String> value,Context context) { try { preparedStatement.setString(1, value.f0); preparedStatement.setString(2, value.f1); preparedStatement.setString(3, value.f2); preparedStatement.setString(4, value.f3); preparedStatement.setString(5, value.f4); System.out.println("Start insert"); preparedStatement.executeUpdate(); } catch (Exception e) { e.printStackTrace(); } } @Override public void close() throws Exception { if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } }

主函数:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.source; import com.sink.PostSQLSink; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class FlinkCleanKafka { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); //检查点 每5000ms // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "master01:9092");//kafka的节点的IP或者hostName,多个使用逗号分隔 properties.setProperty("zookeeper.connect", "master01:2181");//zookeeper的节点的IP或者hostName,多个使用逗号进行分隔 properties.setProperty("group.id", "test22222");//flink consumer flink的消费者的group.id FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<String>("flink_test" , new SimpleStringSchema() , properties); // myConsumer.setStartFromEarliest(); //最早 myConsumer.setStartFromLatest(); //设置消费时间,最后 DataStream<String> stream = env.addSource(myConsumer); // stream.print(); DataStream CleanData = stream.map(new MapFunction<String, Tuple5<String, String, String, String, String>>() { @Override public Tuple5<String, String, String, String, String> map(String value) { String[] data = value.split("\t"); // for(String str:data){ // System.out.println(str); // } String CourseID = null; String url = data[2].split(" ")[1]; // System.out.println("url: "+url); if (url.startsWith("/class")) { String CourseHTML = url.split("/")[2]; CourseID = CourseHTML.substring(0, CourseHTML.lastIndexOf(".")); // System.out.println("CourseID: "+CourseID); } return Tuple5.of(data[0], data[1], CourseID, data[3], data[4]); } }).filter(new FilterFunction<Tuple5<String, String, String, String, String>>() { @Override public boolean filter(Tuple5<String, String, String, String, String> value) { return value.f2 != null; } }); CleanData.print(); CleanData.addSink(new PostSQLSink()); env.execute("Flink kafka"); } }

kafka的数据生成可参考以下文档
https://blog.csdn.net/weixin_43315211/article/details/88424903

最后

以上就是虚幻大树最近收集整理的关于flink连接kafka,postgresql sink的全部内容,更多相关flink连接kafka,postgresql内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(99)

评论列表共有 0 条评论

立即
投稿
返回
顶部