我是靠谱客的博主 仁爱台灯,这篇文章主要介绍Spark Streaming 输出数据清洗结果到Mysql,现在分享给大家,希望可以做个参考。

Flume+Kafka+Spark Streaming + Mysql

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package util; import java.awt.List; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import kafka.serializer.StringDecoder; import scala.Tuple2; public class DataClean { public static void main(String[] args) throws InterruptedException { SparkSession spark = SparkSession.builder().master("local").appName("dataClean").getOrCreate(); //用SparkSession创建Spark Context JavaSparkContext conf = new JavaSparkContext(spark.sparkContext()); JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(2)); Map<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("bootstrap.servers", "vm04:9092,vm05:9092,vm06:9092"); Set<String> topics = new HashSet<String>(); topics.add("test_m_brokers"); JavaPairDStream<String, String> lines = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); JavaDStream<String> words = lines.map(new Function<Tuple2<String, String>, String>() { public String call(Tuple2<String, String> tuple) throws Exception { String[] strs = tuple._2.split(","); String ids = strs[9].split("/")[3]; String tmp = ""; tmp += strs[1] + "t" + strs[9] + "t" + ids + "t" + strs[11]; return tmp; } }); //words.print(); words.foreachRDD(rdd -> { String url = "jdbc:mysql://vm04:3306/echarts"; Properties connectionProperties = new Properties(); connectionProperties.put("user", "root"); connectionProperties.put("password", "xxxxx"); connectionProperties.put("driver", "com.mysql.jdbc.Driver"); JavaRDD<Row> ip = rdd.map(new Function<String, Row>() { public Row call(String line) throws Exception { String[] tmps = line.split("t"); return RowFactory.create(String.valueOf(tmps[0]),String.valueOf(tmps[1]), String.valueOf(tmps[2]),String.valueOf(tmps[3])); } }); ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("ip", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("video", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("videoid", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("device", DataTypes.StringType, true)); StructType type = DataTypes.createStructType(fields); Dataset<Row> ipDF = spark.createDataFrame(ip, type); ipDF.write().mode("append").jdbc(url, "echarts", connectionProperties); //spark.close(); }); ssc.start(); ssc.awaitTermination(); } }

最后

以上就是仁爱台灯最近收集整理的关于Spark Streaming 输出数据清洗结果到Mysql的全部内容,更多相关Spark内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(106)

评论列表共有 0 条评论

立即
投稿
返回
顶部