Fink 处理过程可以简化为三步 (source transformations sink)
source表示数据来源
transformations表示执行flink的处理逻辑 (核心)
sink表示数据分布式处理完成之后的数据走向
source 获取数据的方式自带的api如下
公共pom
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.11.1</version> </dependency>
1.获取nc数据源
核心代码
复制代码
1
2
3
4
5
6
7
8
9
10/** * 获取端口9999的数据 需要先启动nc (执行命令 nc -l -p 9999),否则显示连接拒绝 * * @param env * @return */ private static DataStream<String> getDataNC(StreamExecutionEnvironment env) { DataStreamSource<String> data = env.socketTextStream("localhost", 9999, "n"); return data; }
2.获取kafka数据源
特定pom
复制代码
1
2
3
4
5<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.11.1</version> </dependency>
核心代码
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14/** * 采集kafka的数据 * * @param env * @return */ private static DataStream<String> getDataKafka(StreamExecutionEnvironment env) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "ip:port"); props.setProperty("group.id", "flink-group"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); DataStreamSource<String> streamSource = env.addSource(consumer); return streamSource; }
source 自定义
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; public class MySource extends RichSourceFunction<String> {//此处泛型需要主动显示指定,与sink不同 /** * source获取数据的方法 * * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { // 循环可以不停的读取静态数据 while (true) { ctx.collect("1,2,3,4,5"); System.out.println(this + "MySource获取数据"); Thread.sleep(3000); } } /** * open方法在source第一次启动时调用,一般用于source的初始化操作,例如初始化数据库连接 */ @Override public void open(Configuration parameters) throws Exception { System.out.println("MySource初始化" + this); super.open(parameters); } /** * close方法在source退出时调用,一般用于source的资源回收操作,例如关闭数据库连接 */ @Override public void close() throws Exception { System.out.println("MySource回收资源" + this); super.close(); } @Override public void cancel() { System.out.println("MySource取消" + this); } }
使用source代码
复制代码
1
2
3final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2);//设置flink分布式计算的迸发度 DataStream<String> data = env.addSource(new MySource());
sink 获取数据的方式自带的api如下
复制代码
1windowCounts.addSink(new PrintSinkFunction<>());//打印到控制台
sink的经过flink分布式计算完成之后可以自定义自定义结果处理,例如数据需要保存到mysql
自定义sink需要继承类 org.apache.flink.streaming.api.functions.sink.RichSinkFunction
代码如下
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * 自定义sink * * @param <T> */ public class MySink<T> extends RichSinkFunction<T> {//MySink可以继续使用泛型 /** * open方法在sink第一次启动时调用,一般用于sink的初始化操作,例如初始化数据库连接 */ @Override public void open(Configuration parameters) throws Exception { System.out.println("初始化" + this); super.open(parameters); } /** * invoke方法是sink数据处理逻辑的方法,source端传来的数据都在invoke方法中进行处理 * 其中invoke方法中第一个参数类型与RichSinkFunction<String>中的泛型对应。第二个参数 * 为一些上下文信息 */ @Override public void invoke(T value, Context context) throws Exception { System.out.println(this + "输出:" + value); } /** * close方法在sink结束时调用,一般用于资源的回收操作,例如关闭数据库连接 */ @Override public void close() throws Exception { System.out.println("回收资源" + this); super.close(); } }
综合代码示例:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; /** * Flink 单次统计 */ public class SocketWindowWordCount { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);//设置flink分布式计算的迸发度 DataStream<String> data = env.addSource(new MySource());//添加source //单词切割 SingleOutputStreamOperator<String> flatMap = data.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for (String word : value.split(",")) { out.collect(word);//单词切割 } } }); //单词记录为1 SingleOutputStreamOperator<Tuple2<String, Long>> map = flatMap.map(new MapFunction<String, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(String value) throws Exception { return new Tuple2<>(value, 1L); } }); //分区 KeyedStream<Tuple2<String, Long>, String> word = map.keyBy(new KeySelector<Tuple2<String, Long>, String>() { @Override public String getKey(Tuple2<String, Long> tp2) throws Exception { return tp2.f0;//根据Tuple2的f0 即 key分区 } }); //时间窗口长度为5秒,窗口移动为1秒进行计算 WindowedStream<Tuple2<String, Long>, String, TimeWindow> windowedStream = word.timeWindow(Time.seconds(1), Time.seconds(1)); //统计计算 SingleOutputStreamOperator<Tuple2<String, Long>> windowCounts = windowedStream.reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> tp2_1, Tuple2<String, Long> tp2_2) throws Exception { return new Tuple2<>(tp2_1.f0, tp2_1.f1 + tp2_2.f1); } }); windowCounts.addSink(new MySink<>());//打印到控制台 env.execute("Socket Window WordCount"); //开始执行 } /** * 获取端口9999的数据 需要先启动nc (执行命令 nc -l -p 9999),否则显示连接拒绝 * * @param env * @return */ private static DataStream<String> getDataNC(StreamExecutionEnvironment env) { DataStreamSource<String> data = env.socketTextStream("localhost", 9999, "n"); return data; } /** * 采集kafka的数据 * * @param env * @return */ private static DataStream<String> getDataKafka(StreamExecutionEnvironment env) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "39.100.228.221:9092"); props.setProperty("group.id", "flink-group"); SourceFunction<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); DataStreamSource<String> streamSource = env.addSource(consumer); return streamSource; } }
最后
以上就是奋斗皮皮虾最近收集整理的关于Flink 流式计算 source与sink的全部内容,更多相关Flink内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复