欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Spark:基于PySpark的逻辑回归和决策树模型对泰旦尼克号幸存者预测的机器学习流程

时间:2023-06-25

pyspark的ML回顾下

文章目录

官网文档环境泰坦尼克号数据分析泰坦尼克号数据清洗整理Spark ML PipelineTitanic幸存者预测:逻辑回归LR模型

模型训练模型预测 Titanic幸存者预测:决策树模型 官网文档

https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html

环境

Python3spark2.4.8jupyternotebookJDK8

jupyternotebook整合spark可参考:https://blog.csdn.net/maoyuanming0806/article/details/122886518

泰坦尼克号数据分析

我们拿到数据肯定要先分析数据,然后才能进行特征工程。

通过图表、统计方法对数据的分布状态、字段数字特征,数值关系。描述数据分为趋势分析、离中趋势分析、相关分析

趋势分析:平均数、中数、众数等离中趋势分析:最大值、最小值、四分差、平均差、方差、标准差相关分析:分析是否有统计学上的相关关系

import findsparkfindspark.init()##############################################from pyspark.sql import SparkSessionfrom pyspark.sql.context import SQLContextspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()sc = spark.sparkContext#############################################df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True).cache()#df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True).cache()#计算基本的统计描述信息print("离中趋势统计分析:数据维度描述 " + "===================================")df_train.describe("Age","Pclass","SibSp","Parch").show()df_train.describe("Sex","Cabin","Embarked","Fare","Survived").show()print("相关分析:数值分析幸存者数量和性别关系 " + "===================================")pdf = df_train.groupBy('sex', 'Survived').agg({'PassengerId':'count'}).withColumnRenamed('count(PassengerId)','count').orderBy('sex').toPandas()print(pdf)# 画图描述数据:幸存者和性别关系print("画图描述数据:幸存者和性别关系 " + "===================================")import numpy as npimport pandas as pdimport matplotlib.pyplot as plt## 防止中文乱码plt.rcParams['font.sans-serif']=['SimHei']plt.rcParams['axes.unicode_minus']=Falsewidth = 0.35fig, ax = plt.subplots()labels = ["female", 'male']male_vs = pdf[pdf["Survived"] == 1]["count"]female_vs = pdf[pdf["Survived"] == 0]["count"]ax.bar(labels, male_vs, width, label='Survived')ax.bar(labels, female_vs, width, bottom=male_vs, label='UnSurvived')ax.set_ylabel('性别')ax.set_title('Survived和性别关系分析')ax.legend()plt.show()print("相关分析:数值分析幸存者数量和社会经济状态关系 " + "===================================")pdf = df_train.groupBy('Pclass', 'Survived').agg({'PassengerId':'count'}).withColumnRenamed("count(PassengerId)", "count").toPandas()print(pdf)print("相关分析:数值分析幸存者数量和父母子女个数关系 " + "===================================")pdf = df_train.groupBy('Parch', 'Survived').agg({'PassengerId':'count'}).withColumnRenamed("count(PassengerId)", "count").toPandas()print(pdf)print("相关分析:数值分析幸存者数量和兄弟姐妹及配偶的个数关系 " + "===================================")pdf = df_train.groupBy('SibSp', 'Survived').agg({'PassengerId':'count'}).withColumnRenamed("count(PassengerId)", "count").toPandas()print(pdf)#############################################sc.stop()

执行结果太长了不贴了,总之从上面经过数值分析,图示分析观察各个特征与幸存者是否存在关系,关系如何,进行主观的特征分析

泰坦尼克号数据清洗整理

经过数据观察分析知道了部分数据需要处理,比如进行格式转换、缺失值整理等特征工程处理

import findsparkfindspark.init()##############################################from pyspark.sql import SparkSessionfrom pyspark.sql.context import SQLContextspark = SparkSession.builder .master("local[*]") .appName("PySpark ML") .getOrCreate()sc = spark.sparkContext#############################################df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True).cache()#df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True).cache()print("特征处理 " + "===================================")#缺失值处理:年龄,用平均值填充到缺失值,计算得到平均年龄为29.699df_train = df_train.fillna({'Age':round(29.699, 0)})#缺失值处理:登陆港口,用登陆最多的港口'S'替换缺失值df_train = df_train.fillna({'Embarked':'S'})# 删除无用列df_train = df_train.drop("Cabin")df_train = df_train.drop("Ticket")# 字符标签转为数值from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexerlabelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)df_train.show(5)# 特征选择print("特征选择 " + "===================================")features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']train_features = df_train[features]train_features.show(5)# 特征向量化print("特征向量化 " + "===================================")df_assember = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived'], outputCol="features")df = df_assember.transform(train_features)df.show(5)############################################## 测试数据做同样处理df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True).cache()#用平均值29.699替换缺失值df_test = df_test.fillna({'Age': round(29.699,0)})#用登录最多的港口'S'替换缺失值df_test = df_test.fillna({'Embarked': 'S'})# 删除无用列df_test = df_test.drop("Cabin")df_test = df_test.drop("Ticket")# 字符标签转为数值labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")model = labelIndexer.fit(df_test)df_test = model.transform(df_test)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")model = labelIndexer.fit(df_test)df_test = model.transform(df_test)df_test.show()# 特征选择features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked']test_features = df_test[features]test_features.show()# 特征向量化df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")df2 = df_assembler.transform(test_features)df2["features",].show()#############################################sc.stop()

执行结果也不完整贴了,很长,注释很清晰,贴前几个比较好看的

特征处理 ===================================+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Fare|Embarked|iEmbarked|iSex|+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+| 1| 0| 3|Braund, Mr、Owen ...| male|22.0| 1| 0| 7.25| S| 0.0| 0.0|| 2| 1| 1|Cumings, Mrs、Joh...|female|38.0| 1| 0|71.2833| C| 1.0| 1.0|| 3| 1| 3|Heikkinen, Miss、...|female|26.0| 0| 0| 7.925| S| 0.0| 1.0|| 4| 1| 1|Futrelle, Mrs、Ja...|female|35.0| 1| 0| 53.1| S| 0.0| 1.0|| 5| 0| 3|Allen, Mr、Willia...| male|35.0| 0| 0| 8.05| S| 0.0| 0.0|+-----------+--------+------+--------------------+------+----+-----+-----+-------+--------+---------+----+only showing top 5 rows特征选择 ===================================+------+----+----+-----+-----+-------+---------+--------+|Pclass|iSex| Age|SibSp|Parch| Fare|iEmbarked|Survived|+------+----+----+-----+-----+-------+---------+--------+| 3| 0.0|22.0| 1| 0| 7.25| 0.0| 0|| 1| 1.0|38.0| 1| 0|71.2833| 1.0| 1|| 3| 1.0|26.0| 0| 0| 7.925| 0.0| 1|| 1| 1.0|35.0| 1| 0| 53.1| 0.0| 1|| 3| 0.0|35.0| 0| 0| 8.05| 0.0| 0|+------+----+----+-----+-----+-------+---------+--------+only showing top 5 rows特征向量化 ===================================+------+----+----+-----+-----+-------+---------+--------+--------------------+|Pclass|iSex| Age|SibSp|Parch| Fare|iEmbarked|Survived| features|+------+----+----+-----+-----+-------+---------+--------+--------------------+| 3| 0.0|22.0| 1| 0| 7.25| 0.0| 0|(8,[0,2,3,5],[3.0...|| 1| 1.0|38.0| 1| 0|71.2833| 1.0| 1|[1.0,1.0,38.0,1.0...|| 3| 1.0|26.0| 0| 0| 7.925| 0.0| 1|[3.0,1.0,26.0,0.0...|| 1| 1.0|35.0| 1| 0| 53.1| 0.0| 1|[1.0,1.0,35.0,1.0...|| 3| 0.0|35.0| 0| 0| 8.05| 0.0| 0|(8,[0,2,5],[3.0,3...|+------+----+----+-----+-----+-------+---------+--------+--------------------+only showing top 5 rows

Spark ML Pipeline

机器学习由一系列步骤组成,Spark的Pipeline抽象定义了整个过程,我们只需按照特定顺序组装出一个Pipeline流水线即可

Transformer阶段调用transform()方法Estimator阶段调用fit()方法来生成预测结果

如下Pipleline示例

import findsparkfindspark.init()##############################################from pyspark.ml import Pipelinefrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.feature import HashingTF, Tokenizerfrom pyspark.sql import SparkSessionspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()sc = spark.sparkContext##############################################模拟训练数据,有PySpark的文本为1,其他为0training = spark.createDataframe([ (0, "Hello PySpark", 1.0), (1, "Using Flink", 0.0), (2, "PySpark 3.0", 1.0), (3, "Test MySQL", 0.0)], ["id", "text", "label"])# pipeline 三个阶段: tokenizer(文本词向量化处理) -> hashingTF(文本特征向量化处理) -> logR.(LR模型)tokenizer = Tokenizer(inputCol="text", outputCol="words")hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")logR = LogisticRegression(maxIter=10, regParam=0.001)pipeline = Pipeline(stages=[tokenizer, hashingTF, logR])#训练数据上进行pipeline fit操作,产生一个modelmodel = pipeline.fit(training)##############################################测试集test = spark.createDataframe([ (4, "PySpark Pipeline"), (5, "pipeline"), (6, "PySpark python"), (7, "julia c#")], ["id", "text"])#model执行transformprediction = model.transform(test)#预测selected = prediction.select("id", "text", "probability", "prediction")for row in selected.collect(): tid, text, prob, prediction = row print("(%d, %s) --> prediction=%f,prob=%s" % (tid, text, prediction,str(prob)))#############################################sc.stop()

执行结果

(4, PySpark Pipeline) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833](5, pipeline) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104](6, PySpark python) --> prediction=1.000000,prob=[0.029796174862816768,0.9702038251371833](7, julia c#) --> prediction=0.000000,prob=[0.56611226449896,0.43388773550104]

Titanic幸存者预测:逻辑回归LR模型

LR模型详细介绍可参考:https://blog.csdn.net/maoyuanming0806/article/details/120535181

模型训练

import findsparkfindspark.init()##############################################from pyspark.sql import SparkSessionfrom pyspark.sql.context import SQLContextfrom pyspark.ml.feature import StringIndexer, VectorAssemblerspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()sc = spark.sparkContext############################################## 数据清洗、整理、特征选择、特征向量化df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) .cache()df_train = df_train.fillna({'Age': round(29.699,0)})df_train = df_train.fillna({'Embarked': 'S'})df_train = df_train.drop("Cabin")df_train = df_train.drop("Ticket")labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']train_features = df_train[features]df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")train_data = df_assembler.transform(train_features)# LR模型训练from pyspark.ml.classification import LogisticRegressionlg = LogisticRegression(labelCol='Survived')lgModel = lg.fit(train_data)# 模型保存lgModel.write().overwrite().save("./model/logistic-titanic")print("save model to ./model/logistic-titanic")# 查看模型trainingSummary = lgModel.summarytrainingSummary.roc.show(5)print("areaUnderROC: " + str(trainingSummary.areaUnderROC))# 画图ROCimport matplotlib.pyplot as pltplt.figure(figsize=(5,5))plt.plot([0, 1], [0, 1], 'r--')# pyspark bug,需要如下代码,否则直接collect会报错:AttributeError: 'NoneType' object has no attribute 'setCallSite'df_FPR = lgModel.summary.roc.select('FPR')df_FPR.sql_ctx.sparkSession._jsparkSession = spark._jsparkSessiondf_FPR._sc = spark._scdf_TPR = lgModel.summary.roc.select('TPR')df_TPR.sql_ctx.sparkSession._jsparkSession = spark._jsparkSessiondf_TPR._sc = spark._scplt.plot(df_FPR.collect(), df_TPR.collect())plt.xlabel('FPR')plt.ylabel('TPR')plt.show()#############################################sc.stop()

注意,一般评判指标使用测试数据的结果去评判,包括AUC,ROC指标等等,上面为了方便先用训练数据画一下

模型预测

import findsparkfindspark.init()##############################################from pyspark.sql import SparkSessionfrom pyspark.sql.context import SQLContextfrom pyspark.ml.feature import StringIndexer, VectorAssemblerspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()sc = spark.sparkContext#############################################df_test = spark.read.csv('./data/titanic-test.csv',header=True,inferSchema=True).cache()# 数据清洗、整理、特征选择、特征向量化df_test = df_test.fillna({'Age': round(29.699,0)})df_test = df_test.fillna({'Embarked': 'S'})df_test = df_test.fillna({'Fare': 36.0})df_test = df_test.drop("Cabin")df_test = df_test.drop("Ticket")#新增Survived列,默认值为0df_test = df_test.withColumn("Survived",0 * df_test["Age"])labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")model = labelIndexer.fit(df_test)df_test = model.transform(df_test)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")model = labelIndexer.fit(df_test)df_test = model.transform(df_test)features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']test_features = df_test[features]df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")test = df_assembler.transform(test_features)from pyspark.ml.classification import LogisticRegressionModellgModel = LogisticRegressionModel.load("./model/logistic-titanic")testSummary =lgModel.evaluate(test)results=testSummary.predictionsresults["features","rawPrediction","probability","prediction"].show(5)#############################################sc.stop()

执行结果

+--------------------+--------------------+--------------------+----------+| features| rawPrediction| probability|prediction|+--------------------+--------------------+--------------------+----------+|[3.0,0.0,34.5,0.0...|[1.99328605097899...|[0.88009035220072...| 0.0||[3.0,1.0,47.0,1.0...|[0.63374031844971...|[0.65333708360849...| 0.0||[2.0,0.0,62.0,0.0...|[1.97058477648159...|[0.87767391006101...| 0.0||(7,[0,2,5],[3.0,2...|[2.21170839644084...|[0.90129601257823...| 0.0||[3.0,1.0,22.0,1.0...|[-0.2919725495559...|[0.42752102300610...| 1.0|+--------------------+--------------------+--------------------+----------+only showing top 5 rows

Titanic幸存者预测:决策树模型

决策树模型详细可参考:https://blog.csdn.net/maoyuanming0806/article/details/120207024

import findsparkfindspark.init()##############################################from pyspark.sql import SparkSessionfrom pyspark.sql.context import SQLContextfrom pyspark.ml.feature import StringIndexer, VectorAssemblerspark = SparkSession.builder.master("local[*]").appName("PySpark ML").getOrCreate()sc = spark.sparkContext############################################## 数据清洗、整理、特征选择、特征向量化df_train = spark.read.csv('./data/titanic-train.csv',header=True,inferSchema=True) .cache()df_train = df_train.fillna({'Age': round(29.699,0)})df_train = df_train.fillna({'Embarked': 'S'})df_train = df_train.drop("Cabin")df_train = df_train.drop("Ticket")labelIndexer = StringIndexer(inputCol="Embarked", outputCol="iEmbarked")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)labelIndexer = StringIndexer(inputCol="Sex", outputCol="iSex")model = labelIndexer.fit(df_train)df_train = model.transform(df_train)features = ['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked','Survived']train_features = df_train[features]df_assembler = VectorAssembler(inputCols=['Pclass', 'iSex', 'Age', 'SibSp', 'Parch', 'Fare', 'iEmbarked'], outputCol="features")train_data = df_assembler.transform(train_features)# 决策树模型训练from pyspark.ml.classification import DecisionTreeClassifierdtree = DecisionTreeClassifier(labelCol='Survived', featuresCol='features')treeModel = dtree.fit(train_data)print(treeModel.toDebugString)# 对训练数据进行预测dt_predictions = treeModel.transform(train_data)dt_predictions.select("prediction", "Survived", "features").show()from pyspark.ml.evaluation import MulticlassClassificationevaluatormulti_evaluator = MulticlassClassificationevaluator(labelCol = 'Survived', metricName = 'accuracy')print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))#############################################sc.stop()

执行结果,其中因为开了决策树构建树模型过程debug打印,这里就不贴了,直接看预测结果

+----------+--------+--------------------+|prediction|Survived| features|+----------+--------+--------------------+| 0.0| 0|[3.0,0.0,22.0,1.0...|| 1.0| 1|[1.0,1.0,38.0,1.0...|| 1.0| 1|[3.0,1.0,26.0,0.0...|| 1.0| 1|[1.0,1.0,35.0,1.0...|| 0.0| 0|(7,[0,2,5],[3.0,3...|| 0.0| 0|[3.0,0.0,30.0,0.0...|| 0.0| 0|(7,[0,2,5],[1.0,5...|| 0.0| 0|[3.0,0.0,2.0,3.0,...|| 1.0| 1|[3.0,1.0,27.0,0.0...|| 1.0| 1|[2.0,1.0,14.0,1.0...|| 1.0| 1|[3.0,1.0,4.0,1.0,...|| 1.0| 1|[1.0,1.0,58.0,0.0...|| 0.0| 0|(7,[0,2,5],[3.0,2...|| 0.0| 0|[3.0,0.0,39.0,1.0...|| 1.0| 0|[3.0,1.0,14.0,0.0...|| 1.0| 1|[2.0,1.0,55.0,0.0...|| 0.0| 0|[3.0,0.0,2.0,4.0,...|| 0.0| 1|(7,[0,2,5],[2.0,3...|| 1.0| 0|[3.0,1.0,31.0,1.0...|| 1.0| 1|[3.0,1.0,30.0,0.0...|+----------+--------+--------------------+only showing top 20 rowsDecision Tree Accu: 0.8417508417508418

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。