admin 管理员组文章数量: 887021
【spark
能今天做好的事就不要等到明天。以梦为马,学习趁年华。
1、学习路线
一份好的roadmap很重要
2、技术笔记
2.1 RDD
RDD是弹性分布式数据集,是一组不可变的JVM对象的分布及,可以执行高速运算,是spark的核心。
2.1.1 创建RDD
# 集合 生成ParallelCollectionRDD
data = sc.parallelize([('alex',22),('alex',22),('alex',22),('alex',22)])
# 文件 4代表分区数 生成MapPartitionsRDD
data_from_file = sc.textFile('../data/users.txt.gz', 4)
- collect():可以将RDD 的所有元素返回给驱动程序,将其序列化成列表。
- take(1):取其中的一个值
- map(lambda:x…):在每个分区中,都会传递RDD的一个元素给lambda方法
2.1.2 转换
2.1.2.1 map
data_2014 = data_from_file.map(lambda row:int(row[16]))
2.1.2.2 filter
筛选
data_filter = data_from_file.filter(lambda row: row[16]=='2014')
data_filter.count()
2.1.2.3 flatMap
flatMap返回的是一个扁平的结果,而不是一个列表。
data_2014_flat = data_from_file.flatMap(lambda row: (wor[16], int(row[16])+1))
data_2014_flat.take(10)
2.1.2.4 distinct
该方法返回指定列中不同值的列表
distinct_gender = data_from_file.map(lambda row:row[5]).distinct()
distinct_gender.collect()
# ['-99', 'M', 'F']
2.1.2.5 sample
sample方法返回数据集的随机样本,第一个参数指定采样是否应该替换,第二个参数定义返回数据的占比,第三个是伪随机数生成器种子。
data_sample = data_from_file.sample(False, 0.2, 555)
2.1.2.6 letfOuterJoin
2.1.2.7 repartition
重新对数据集分区,改变数据集分区的数量,会重组数据。
rdd1 = rdd1.repatition(4)
len(rdd1.glom().collect())
glom()方法会产生一个列表,其中每个元素是指定分区中数据集的所有元素的另一个列表
2.1.3 操作
2.1.2.1 take
该方法优于collect方法,它只返回单个数据分区的前n行,而collect返回的是整个RDD。
data_first = data_from_file.take(1)
data_sample = data_from_file.takeSmaple(False, 1, 666) #取的一些随机数据
2.1.2.2 collect
该方法将所有rdd的元素返回给驱动程序
2.1.3.3 reduce
reduce方法使用指定的方法减少rdd中的元素
rdd1.map(lambda row:row[1]).reduce(lambda x, y:x+y)
- 先通过map转换,创建一个包含rdd1所有值的列表
- 然后用reduce方法对结果进行处理
- 在每个分区,reduce方法运行求和,将总和返回给最终节点
2.1.3.4 reduceByKey
与reduce类似,只是reduce依据key来进行reduce操作,最后返回的结果也是key-value对。
2.1.3.5 count
统计rdd的元素个数
data_reduce.count()
2.1.3.6 countByKey
根据key值进行计数
2.1.3.7 saveAsTextFile
让rdd保存为文本文件,每个文件一个分区
data.saveAsTextFile('../data/data.txt')
2.1.3.8 foreach
对rdd的每个元素,用迭代的方式应用相同的函数,和map比较,foreach按照一个接一个方式,对每一条记录应用一个定义好的函数
2.2 DataFrame
dataframe和catalyst优化器的意义是在和非优化的RDD查询比较时增加pyspark查询的性能。
2.2.1 创建DataFrame
- 通过SparkSession导入数据,来创建DataFrame
- 通过先创建RDD,再转换为一个DataFrame
rdd1 = sc.parallelize(("""{'id':'1','name':'katie','age':'19','eyeColor':'brown'}""","""{'id':'2','name':'Michael','age':'20','eyeColor':'green'}"""
))
df = spark.read.json(rdd1)
2.2.2 df的查询
2.2.2.1 df api查询
df.collect()
df.show(<n>)
df.take(n)
2.2.2.2 sql查询
spark.sql("select * from users").collect()
注意:collect方法,返回行对象列表所有的记录,需要注意对内存的使用管理。
2.2.3 RDD交互操作
2.2.2.3 反射推断
users.printSchema()
![image.png](.png#clientId=u253db374-a567-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=141&id=u8c9e10d3&margin=[object Object]&name=image.png&originHeight=247&originWidth=575&originalType=binary&ratio=1&rotation=0&showTitle=false&size=79230&status=done&style=none&taskId=u93cfe3d5-ec0b-4ca6-8bfc-f1d12cf2978&title=&width=329)
2.2.3.2 编程方式
通过spark sql 中引入数据类型,用编程方式来指定模式,生成csv数据
from pyspark.sql.types import *
rdd = sc.parallelize([(1, 'Katie', 19, 'brown'),(2, 'Michael', 22, 'brown'),(3, 'Simone', 23, 'blue')
])schema = StructType([StructField("id", LongType(), True),StructField("name", StringType(), True),StructField("age", LongType(), True),StructField("eyeColor", StringType(), True),
])users = spark.createDataFrame(rdd, schema) # 对RDD应用该模式并且创建DataFrame
users.createOrReplaceTempView("users") # 利用DataFrame创建一个临时视图
users.printSchema()
2.2.4 DataFrame api查询
2.2.4.1 df.count
users.count()
2.2.4.2 筛选
- df.select
- df.filter
- like
users.select('id', 'age').filter('age==22').show()
users.select(users.id, users.age).filter(users.age==22).show()
users.select('name', 'eyeColor').filter("eyeColor like 'b%'").show()
2.2.4 SQL查询
2.2.4.1 行数
spark.sql("select count(1) from users").show()
2.2.4.2 where语句
# 筛选age==22的人
spark.sql("select id, age from users where age == 22").show()
# 筛选眼睛颜色是b开头的用户
spark.sql("select name, eyeColor from users where eyeColor like 'b%'").show()
2.3 数据清洗
2.2.1 操作
2.3.1.1 df.dropDuplicates
去除重复数据
# subset参数来指定只处理id以外的列
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
2.3.1.2 pyspark.sql.funtions.fn
# 计算id的总数 count
# 计算id的唯一个数 countDistinct
import pyspark.sql.functions as fn
user.agg(fn.count('id').alias('count'),fn.countDistinct('id').alias('distinct')
).show()
2.3.1.3 df.withColumn
- df.withColumn 新增一行
- fn.monotonically_increasing_id 给每一条记录提供一个唯一并且递增的id
df.withColumn('new_id', fn.monotonically_increasing_id()).show()
![image.png](.png#clientId=ubf066af5-3ef9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=148&id=uf6a58a7f&margin=[object Object]&name=image.png&originHeight=251&originWidth=657&originalType=binary&ratio=1&rotation=0&showTitle=false&size=153365&status=done&style=none&taskId=uca636ab1-83d4-4fff-9083-44620a4fcec&title=&width=388.60003662109375)
2.3.1.4 (*[…])
- count(*[…])
- agg(*[…])
- select(*[…])
- *参数(列命的位置)指示该方法计算所有的列
- *之前的列指示agg方法将该列表处理为一组独立的参数传递给函数
df = spark.createDataFrame([(1, 143.5, 5.6, 28, 'M', 10000),(2, 176.5, 5.4, 45, 'M', None),(3, None, 5.2, None, None, None),(4, 180.2, 5.9, 33, 'F', None),(5, 166.5, None, 54, 'M', None),(6, 175.1, 5.3, None, 'F', 10000),
],['id','weight', 'height', 'age', 'gender', 'income'])
df.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c+"missing") for c in df.columns
]).show()
![image.png](.png#clientId=ubf066af5-3ef9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=86&id=u9efb25dd&margin=[object Object]&name=image.png&originHeight=108&originWidth=1160&originalType=binary&ratio=1&rotation=0&showTitle=false&size=121498&status=done&style=none&taskId=u882a7914-e3c7-4441-9565-23f6d57abe7&title=&width=928)
2.3.1.5 df.dropna
删除指定列为空值的行
# todo
df.select([ c for c in df.columns if c != "income"])
2.3.1.6 df.fillna
填充缺失值
2.3.1.7 df.approxQuantile
计算每个特征的上下截断点
- 参数1:需要统计的列命
- 参数2:0-1之间的数或者一个列表
- 参数3:每个度量的一个可接受的错误程度
cols = ['weight', 'height', 'age']
bounds = {}for col in cols:quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)IQR = quantiles[1]-quantiles[0]bounds[col] = [quantiles[0] - 1.5*IQR,quantiles[1] + 1.5*IQR]
bounds代码字典保存了每个特征的上下限
2.3.1.7 df.groupby
分组
2.3.1.8 df.describe
描述
2.3.1.9 df.agg
agg可以用的聚合函数有:
- avg
- count
- countDistinct
- first
- kurtosis:峰度
- max
- mean
- min
- skewness:偏度
- stddev:标准差
- stddev_pop
- stddev_samp
- sum
- sumDistinct
- var_pop
- var_smap
- variance:方差
# 检查偏度
user.agg({"weight":"skeness"}).show()
2.3.1.10 df.corr
协方差
2.4 MLLib
2.5 ML
ml包操作基于DataFrame。
2.5.1 介绍
- 转换器:transformer
- 评估器:estimator
- 管道:pipeline
2.5.1.1 转换器 transformer
- Binarizer:根据指定的阈值将连续变量转换为对应的二进制值
- Bucketizer
- ChiSqSelector
- CountVectorizer
- DCT
- ElementwiseProduct
- HashingTF
- IDF
- IndexToString
- MaxAbsScaler
- MinMaxScaler
- NGram
- Normalizer
- OneHotEncoder
- PCA
- QuantileDiscretizer
- RegexTokenizer
- RFormula
- SQLTransformer
- StandardScaler
- StopWordsRemover
- StringIndexer
- Tokenizer
- VetcorAssembler
- VectorIndexer
- VectorSlicer
- Word2Vec
2.5.1.2 评估器estimator
使用fit方法
- 分类
- LogisticRegression
- DecisionTreeClassifier
- GBTClassifier
- RandomForestClassifier
- NaiveBayes
- MultilayerPerceptronClassifier
- OneVsRest
- 回归
- AFTSurvivalRegression
- DecisionTreeRegressor
- GBTRegressor
- GerneralizedLinearRegression
- IsotonicRegression
- LinearRegression
- RandomForestRegressor
- 聚类
- BisectingKMeans
- KMeans
- GaussianMixture
- LDA
![image.png](.png#clientId=uc5eb8be8-35b9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=883&id=u519f4375&margin=[object Object]&name=image.png&originHeight=1104&originWidth=976&originalType=binary&ratio=1&rotation=0&showTitle=false&size=95996&status=done&style=none&taskId=u553a376a-c39b-4ba2-ae7e-a9e7d0e03f4&title=&width=780.8)
![image.png](.png#clientId=uc5eb8be8-35b9-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=697&id=u6fb106e7&margin=[object Object]&name=image.png&originHeight=871&originWidth=970&originalType=binary&ratio=1&rotation=0&showTitle=false&size=67925&status=done&style=none&taskId=u21072592-83b7-41ae-b982-fc5cdda518b&title=&width=776)
2.5.1.3 pipeline
管道对象的fit方法执行每个转换器的transform方法和所有评估器的fit方法。
2.5.2 实践
import os
import sys#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/***/spark-2.4.3-bin-hadoop2.7/"sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/bin")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/pyspark")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/lib/py4j-0.9-src.zip")
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home")
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"from pyspark import SparkContext
from pyspark import SparkConfsc = SparkContext("local","testing")print (sc.version)
print ('hello world!!')import pyspark.sql.types as typ
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()labels = [('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),('BIRTH_PLACE', typ.StringType()),('MOTHER_AGE_YEARS', typ.IntegerType()),('FATHER_COMBINED_AGE', typ.IntegerType()),('CIG_BEFORE', typ.IntegerType()),('CIG_1_TRI', typ.IntegerType()),('CIG_2_TRI', typ.IntegerType()),('CIG_3_TRI', typ.IntegerType()),('MOTHER_HEIGHT_IN', typ.IntegerType()),('MOTHER_PRE_WEIGHT', typ.IntegerType()),('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),('MOTHER_WEIGHT_GAIN', typ.IntegerType()),('DIABETES_PRE', typ.IntegerType()),('DIABETES_GEST', typ.IntegerType()),('HYP_TENS_PRE', typ.IntegerType()),('HYP_TENS_GEST', typ.IntegerType()),('PREV_BIRTH_PRETERM', typ.IntegerType())
]schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels
])births = spark.read.csv('births_transformed.csv.gz', header=True, schema=schema)print ('births>>>', births)import pyspark.ml.feature as ftbirths = births \.withColumn( 'BIRTH_PLACE_INT', births['BIRTH_PLACE'] \.cast(typ.IntegerType()))
# 此处打印出来还是string
print ('births>>>', births)# 构建第一个转换器
encoder = ft.OneHotEncoder(inputCol='BIRTH_PLACE_INT', outputCol='BIRTH_PLACE_VEC')print ('encoder>>>', encoder)
print ('encoder.getOutputCol():', encoder.getOutputCol())import pyspark as spark# 将所有的特征整和到一起
featuresCreator = ft.VectorAssembler(inputCols=[col[0] for col in labels[2:]] + \[encoder.getOutputCol()], outputCol='features'
)print ('featuresCreator:', featuresCreator)# 创建评估器
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(maxIter=10, regParam=0.01, labelCol='INFANT_ALIVE_AT_REPORT')
print ('logistic:', logistic)# 创建一个管道
from pyspark.ml import Pipelinepipeline = Pipeline(stages=[encoder, featuresCreator, logistic])# fit
births_train, births_test = births \.randomSplit([0.7, 0.3], seed=666)print ('births_train', births_train)
print ( 'births_test', births_test )# 运行管道,评估模型。
model = pipeline.fit(births_train)
test_model = model.transform(births_test)print ('test_model:', test_model)test_model.take(1)print ('test_model.take(1):', test_model.take(1))# 评估模型性能
import pyspark.ml.evaluation as evevaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))# 保存模型
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)# 在之前模型上继续训练
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline.fit(births_train).transform(births_test).take(1)# 保存整个模型
from pyspark.ml import PipelineModelmodelPath = './infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)loadedPipelineModel = PipelineModel.load(modelPath)
test_loadedModel = loadedPipelineModel.transform(births_test)
print ('test_loadedModel:', test_loadedModel)
2.5.3 超参调优
# 超参调优
import pyspark.ml.tuning as tune# 使用网格搜索
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')grid = tune.ParamGridBuilder() \.addGrid(logistic.maxIter, [2, 10, 50]) \.addGrid(logistic.regParam, [0.01, 0.05, 0.3]) \.build()evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='INFANT_ALIVE_AT_REPORT')cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator
)# 创建转换通道
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(births_train)# fit
cvModel = cv.fit(data_transformer.transform(births_train))data_train = data_transformer \.transform(births_test)
results = cvModel.transform(data_train)print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))# Train-Validation splitting
selector = ft.ChiSqSelector(numTopFeatures=5, featuresCol=featuresCreator.getOutputCol(), outputCol='selectedFeatures',labelCol='INFANT_ALIVE_AT_REPORT'
)logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',featuresCol='selectedFeatures'
)pipeline = Pipeline(stages=[encoder,featuresCreator,selector])
data_transformer = pipeline.fit(births_train)tvs = tune.TrainValidationSplit(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator
)tvsModel = tvs.fit(data_transformer \.transform(births_train)
)data_train = data_transformer \.transform(births_test)
results = tvsModel.transform(data_train)print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))
本文标签: spark
版权声明:本文标题:【spark 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/free/1699084709h327264.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论