我是靠谱客的博主 可爱水壶,这篇文章主要介绍和submit_pyspark入门 | spark-submit 提交pyspark任务,现在分享给大家,希望可以做个参考。

https://github.com/hxjcarrie/pyspark_study

以LogisticRegression为例

  • 输入数据样例(第一列为label,后面为feature)

6fdd06dca0d340d312efaf862998a648.png
  • lrDemo.py(基于RDD的mllib)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#!coding=utf8 ''' author: huangxiaojuan ''' import sys reload(sys) sys.setdefaultencoding('utf8') from pyspark.sql import SparkSession,Row from pyspark.sql.types import * from time import * import numpy import os from pyspark.mllib.linalg import Vectors from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel #os.environ['PYSPARK_PYTHON'] = './python_env/py27/bin/python2' def parseFloat(x): try: rx = float(x) except: rx = 0.0 return rx def parse(line, ifUid=False): l = line.split('t') uid = l[0] label = parseFloat(l[1]) features = map(lambda x: parseFloat(x), l[2:]) if ifUid: return (uid, LabeledPoint(label, features)) else: return LabeledPoint(label, features) def main(): #spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate() spark = SparkSession.builder.getOrCreate() print "Session created!" sc = spark.sparkContext #打印追踪任务url print "The url to track the job: http://namenode-01:8088/proxy/" + sc.applicationId sampleHDFS_train = sys.argv[1] sampleHDFS_test = sys.argv[2] outputHDFS = sys.argv[3] sampleRDD = sc.textFile(sampleHDFS_train).map(parse) predictRDD = sc.textFile(sampleHDFS_test).map(lambda x: parse(x, True)) # 训练 model = LogisticRegressionWithLBFGS.train(sampleRDD) model.clearThreshold() #删除默认阈值(否则后面直接输出0、1) # 预测,保存结果 labelsAndPreds = predictRDD.map(lambda p: (p[0], p[1].label, model.predict(p[1].features))) labelsAndPreds.map(lambda p: 't'.join(map(str, p))).saveAsTextFile(outputHDFS + "/target/output") # 评估不同阈值下的准确率、召回率 labelsAndPreds_label_1 = labelsAndPreds.filter(lambda lp: int(lp[1]) == 1) labelsAndPreds_label_0 = labelsAndPreds.filter(lambda lp: int(lp[1]) == 0) t_cnt = labelsAndPreds_label_1.count() f_cnt = labelsAndPreds_label_0.count() print "threttpttntfptfntaccuracytrecall" for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95]: tp = labelsAndPreds_label_1.filter(lambda lp: lp[2] > thre).count() tn = t_cnt - tp fp = labelsAndPreds_label_0.filter(lambda lp: lp[2] > thre).count() fn = f_cnt - fp print("%.1ft%dt%dt%dt%dt%.4ft%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt))) # 保存模型、加载模型 model.save(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel") sameModel = LogisticRegressionModel.load(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel") print "output:", outputHDFS if __name__ == '__main__': main()
  • lrDemo_df.py(基于DataFrame的ml)
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!coding=utf8 ''' author: huangxiaojuan ''' import sys reload(sys) sys.setdefaultencoding('utf8') from pyspark.sql import SparkSession,Row from pyspark.sql.types import * from time import * import numpy import os from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import StringIndexer, VectorAssembler from pyspark.ml import Pipeline from pyspark.sql.functions import udf, col def getFeatureName(): featureLst = ['feature1', 'feature2', 'feature3', 'feature4', 'feature5', 'feature6', 'feature7', 'feature8', 'feature9'] colLst = ['uid', 'label'] + featureLst return featureLst, colLst def parseFloat(x): try: rx = float(x) except: rx = 0.0 return rx def getDict(dictDataLst, colLst): dictData = {} for i in range(len(colLst)): dictData[colLst[i]] = parseFloat(dictDataLst[i]) if colLst[i] == "label": dictData["weight"] = 1.0 if dictDataLst[i] == '1' else 1.0 return dictData def to_array(col): def to_array_(v): return v.toArray().tolist() return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col) def main(): #spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate() spark = SparkSession.builder.getOrCreate() print "Session created!" sc = spark.sparkContext print "The url to track the job: http://bx-namenode-02:8088/proxy/" + sc.applicationId sampleHDFS_train = sys.argv[1] sampleHDFS_test = sys.argv[2] outputHDFS = sys.argv[3] featureLst, colLst = getFeatureName() #读取hdfs上数据,将RDD转为DataFrame #训练数据 rdd_train = sc.textFile(sampleHDFS_train) rowRDD_train = rdd_train.map(lambda x: getDict(x.split('t'), colLst)) trainDF = spark.createDataFrame(rowRDD_train) #测试数据 rdd_test = sc.textFile(sampleHDFS_test) rowRDD_test = rdd_test.map(lambda x: getDict(x.split('t'), colLst)) testDF = spark.createDataFrame(rowRDD_test) #用于训练的特征featureLst vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol("features") #### 训练 #### print "step 1" lr = LogisticRegression(regParam=0.01, maxIter=100, weightCol="weight") # regParam 正则项参数 pipeline = Pipeline(stages=[vectorAssembler, lr]) model = pipeline.fit(trainDF) #打印参数 print "n-------------------------------------------------------------------------" print "LogisticRegression parameters:n" + lr.explainParams() + "n" print "-------------------------------------------------------------------------n" #### 预测, 保存结果 #### print "step 2" labelsAndPreds = model.transform(testDF).withColumn("probability_xj", to_array(col("probability"))[1]) .select("uid", "label", "prediction", "probability_xj") labelsAndPreds.show() labelsAndPreds.write.mode("overwrite").options(header="true").csv(outputHDFS + "/target/output") #### 评估不同阈值下的准确率、召回率 print "step 3" labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1) labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0) labelsAndPreds_label_1.show(3) labelsAndPreds_label_0.show(3) t_cnt = labelsAndPreds_label_1.count() f_cnt = labelsAndPreds_label_0.count() print "threttpttntfptfntaccuracytrecall" for thre in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95]: tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count() tn = t_cnt - tp fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count() fn = f_cnt - fp print("%.1ft%dt%dt%dt%dt%.4ft%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt))) # 保存模型 model.write().overwrite().save(outputHDFS + "/target/model/lrModel") #加载模型 #model.load(outputHDFS + "/target/model/lrModel") print "output:", outputHDFS if __name__ == '__main__': main()
  • 日志打印模型效果:

279bfc5e11efe3a0b27f9cd5aa0ef360.png

spark-submit_lr.sh 提交任务到集群

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
ModelType=lrDemo ModelType=lrDemo_df #ModelType=xgbDemo CUR_PATH=$(cd "$(dirname "$0")";pwd) echo $CUR_PATH SPARK_PATH=/user/spark/spark YARN_QUEUE= DEPLOY_MODE=cluster DEPLOY_MODE=client input_path_train=hdfs:// input_path_test=hdfs:// output_path=hdfs://user/huangxiaojuan/program/sparkDemo/${ModelType} hadoop fs -rmr $output_path ${SPARK_PATH}/bin/spark-submit --master yarn --name "spark_demo_lr" --queue ${YARN_QUEUE} --deploy-mode ${DEPLOY_MODE} --driver-memory 6g --driver-cores 4 --executor-memory 12g --executor-cores 15 --num-executors 10 --archives ./source/py27.zip#python_env --conf spark.default.parallelism=150 --conf spark.executor.memoryOverhead=4g --conf spark.driver.memoryOverhead=2g --conf spark.yarn.maxAppAttempts=3 --conf spark.yarn.submit.waitAppCompletion=true --conf spark.pyspark.driver.python=./source/py27/bin/python2 --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 --conf spark.pyspark.python=./python_env/py27/bin/python2 ./${ModelType}.py $input_path_train $input_path_test $output_path
  • nohup sh -x spark-submit_lr.sh > spark-submit_lr.log 2>&1 &
  • kill任务: yarn application -kill application_xxxxxxxxx_xxxxx

上传python包

  • 需要保证driver和executor上的python版本一致
  • 若executor上的python不满足要求,可通过如下参数上传打包好的python到executor上
复制代码
1
2
3
4
5
6
7
8
9
10
11
#上传python包到executor --archives ./source/py27.zip #指定executor上python路径 --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 --conf spark.pyspark.python=./python_env/py27/bin/python2 #指定driver上python路径 --conf spark.pyspark.driver.python=./source/py27/bin/python2 #或者先上传至hdfs --conf spark.yarn.dist.archives=hdfs://user/huangxiaojuan/py27.zip#python_env

参考资料

初学者可直接参考:

Submitting Applications

https://spark.apache.org/docs/latest/quick-start.html

spark.mllib参考:

MLlib: Main Guide​spark.apache.org

spark.ml参考:

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html​spark.apache.org

两者区别:

d2c1402f25597f4f5bc715250e25aae1.png
  • spark.sql参考:
Getting Started - Spark 2.4.4 Documentation​spark.apache.org
  • 使用dataFrame进行表的关联,例子:

https://github.com/hxjcarrie/pyspark_study/blob/master/df.py

  • 使用spark.sql进行表的关联,例子:

https://github.com/hxjcarrie/pyspark_study/blob/master/df_sql.py

  • 若要写入hive表,可参考: https://blog.csdn.net/lulynn/article/details/51543567
  • xgboost 参考:
叶一一:pyspark 数据分析-xgboost​zhuanlan.zhihu.com
  • scala参考:
叶一一:scala入门 | spark-submit提交scala任务​zhuanlan.zhihu.com

-------------------------------------------------------------------------------------------

若需要RDD嵌套RDD,或要使用的算法只有python自己的sklearn里有,可以考虑对样本分组做分布式的(但模型训练是单机的,所以这种方法的前提是:分完组的数据量在单机训练占用的内存不多)


Say you find yourself in the peculiar situation where you need to train a whole bunch ofscikit-learnmodels over different groups from a large amount of data. And say you want to leverage Spark to distribute the process to do it all in a scalable fashion

Using PySpark and Pandas UDFs to train many scikit-learn models distributedly​patrickroos.org
a03d5835174339ef3971b5b6a570caa7.png

最后

以上就是可爱水壶最近收集整理的关于和submit_pyspark入门 | spark-submit 提交pyspark任务的全部内容,更多相关和submit_pyspark入门内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(82)

评论列表共有 0 条评论

立即
投稿
返回
顶部