admin 管理员组文章数量: 887021
Spark中 DataFrame,DataSet,RDD
(1)DataSet和RDD
RDD :仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义
大数据的框架许多都要把内存中的数据往磁盘里写,所以DataSet取代rdd和dataframe。
因为,现阶段底层序列化机制使用的是java的或者Kryo的形式。
但是,java序列化出来的数据很大,影响存储Kryo对于小数据量的处理很好,
但是数据量一大,又会出现问题,所以官方的解决方法是使用自定义的编码器(Encoder)去序列化.
DataSetRow :运行时类型检查,比如 salary 是字符串类型,下面语句也只有运行时才进行类型检查。
(2)DataSet和DataFrame
DataSet跟DataFrame还是有挺大区别的,DataFrame开发都是写sql,
但是DataSet是使用类似RDD的API。所以可以理解成DataSet就是存了个数据类型的RDD
DataFrame:
由于 RDD 的局限性,Spark 产生了 DataFrame,DataFrame=RDD+Schema,Schema 是就是元数据,是语义描述信息。
DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
DataFrame 自带优化器 Catalyst,可以自动优化程序
DataFrame 提供了一整套的 Data Source API
DataSet\DataFrame\RDD的区别:
(1)相同点:
都是分布式数据集
DataFrame底层是RDD,但是DataSet不是,不过他们最后都是转换成RDD运行
DataSet和DataFrame的相同点都是有数据特征、数据类型的分布式数据集(schema)
(2)不同点:
(a)schema信息:
RDD中的数据是没有数据类型的
DataFrame中的数据是弱数据类型,不会做数据类型检查
虽然有schema规定了数据类型,但是编译时是不会报错的,运行时才会报错
DataSet中的数据类型是强数据类型
(b)序列化机制:
RDD和DataFrame默认的序列化机制是java的序列化,可以修改为Kyro的机制
DataSet使用自定义的数据编码器进行序列化和反序列化
三者的共性:
RDD,DataFrame,Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供遍历
三者都有惰性机制,在进行Transformation时不会执行,只有在遇到Action 如foreach时,三者才会开始遍历运算
三者都会根据spark的内存情况自动缓存运算,不需要担心内存溢出
三者都有partition,和许多共同函数,如filter,排序等
对DataFrame和Dataset进行操作,需要操作都需要这个包进行支持
Spark中所有功能的入口点是SparkSession类。要创建一个基本的SparkSession,只需使用sparkssession .builder():
DataFrame(DF):在Spark1.3之前版本中,用户使用Spark SQL时需要直接操作RDD API,学习成本相对较高,
代码结构相对复杂,为了提高任务执行性能,用户还需要掌握一些调优手段,Spark从1.3版本引入DataFrame,
DataFrame是一种带有Schema元信息的分布式数据集,类似于传统数据库中的二维表,定义有字段名称和类型,
用户可以像操作数据库表一样使用DataFrame,DataFrame的开发API简洁高效,代码结构清晰,并且Spark针对
DataFrame的操作进行了丰富的优化。DataFrame支持Java、Scala、Pythont等多种开发语言,不论是专业的开发
人员,还是数据分析人员都可以轻松地使用DataFrame处理结构化数据。
>RDD转化DataFrame2种方式:
1.RDD->DataFrame
通过spark.createDataFrame方法将现有RDD转化为DataFrame
val spark = SparkSession.builder().appName("").Master("local").getOrCreate()
val rdd = spark.SparkContext.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2)))
val rdd1 =spark.createDataFrame(rdd)
+------+---+---+
| _1| _2| _3|
+------+---+---+
| aene| 22| NY|
| joe| 39| CO|
|alison| 35| NY|
可以用_1,_2表示列 如:rdd1.select("_1","_2").where("_1 like '%o%'").show
+------+---+
| _1| _2|
+------+---+
| joe| 39|
|alison| 35|
| bob| 71|
+------+---+
补充:
通过定义Case Class,使用反射推断Schema(Case Class方式)
case class Person(name:String,age:Int,address:String)
val spark =SparkSession.builder().master("local").appName("").getOrCreate()
/*spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val rdd = spark.sparkContext.textFile("file:///H:/users.txt").
map(_.split(" ")).
map(x=>Person(x(0),x(1).toInt,x(2))).toDF()*/
下面同理:
val rdd=sc.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(new Person(x(0),x(1).toInt,x(2))))
val rdd_df = spark.createDataFrame(rdd)
这两种写法就是toDF和CreateFrame()通过定义Case class 。
2.RDD->DataFrame
通过SparkSession的read()方法加载不同的数据源
val spark = SparkSession.builder().appName("").Master("local").getOrCreate()
spark.read.textFile("file:///H:/users.txt").toDF()
DataSet是一个特定域的强类型的不可变数据集,每个DataSet都有一个非类型化视图DataFrame
(DataFrame是DataSet[Row]的一种表示形式),DataFrame可以通过调用 as(Encoder)函数转换成DataSet,
而DataSet则可以调用toDf()函数转换成DataFrame,两者之间可以互相灵活转换.
操作DataSet可以像操作RDD一样使用各种转换(Transformation)算子并行操作,转换操作采用"惰性"执行方式
当调用Action算子时才会触发真正的计算执行.
>RDD转化为DataSet的2中方式
1.RDD->DataSet
使用toDS()方法对List()或者Seq()
val spark =SparkSession.builder().appName("").Master("local").getOrCreate()
val rdd = List(1,2,3).toDS
rdd.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
+-----+
2.RDD->DataSet
使用createDataSet方法实现
val spark = SparkSession.builder().master("local").appName("").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
val rdd = sc.textFile("file:///H:/users.txt")
import spark.implicits._
val rdd_ds =spark.createDataset(rdd)
rdd_ds.show()
+-------------+
| value|
+-------------+
| aene 22 NY |
| joe 39 CO |
|alison 35 NY |
| mike 69 VA |
| marie 27 OR|
| jim 21 OR|
3.DataFrame与DataSet相互转化
需要导入隐式转换 import spark.implicts._
要转化为dataframe,就调用toDF
要转化为dataset,就调用toDS(dataframe无法直接转化为dataset)
要转化为rdd,就调用rdd
/*注意:
在构建dataset的时候,必须要给集合提供一个Encoder编码器,将对应的数据类型,转化为StructType元数据信息;
同时,为了能够使用这个Encoder,必须要导入sparksession中的隐式转换spark.implicts._二者缺一不可,否则异常
*/
********************************************************************** ????????????????????????
1.DataFrame->DataSet()
可以通过.createDataSet()方法将DataFrame转化为DataSet
********************************************************************** ????????????????????????
2.DataFrame->DataSet()
可以通过df.as[Person]
将DataFrame转换成DataSet,不过要求是DataFrame的数据类型必须是case class,
并且要求DataFrame的数据类型必须和case class一致(顺序也必须一致)
case class Person(name:String,age:String,address:String)
val spark = SparkSession.builder().appName("").master("local").getOrCreate()
import spark.implicits._
val rdd_df = spark.sparkContext.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2))).
toDF("name":String,"age":String,"address":String)
val rdd_ds =rdd_df.as[Person]
rdd_ds.show()
+------+---+-------+
| name|age|address|
+------+---+-------+
| aene| 22| NY|
| joe| 39| CO|
|alison| 35| NY|
| mike| 69| VA|
| marie| 27| OR|
| jim| 21| OR|
| bob| 71| CA|
注意:如果DataFrame数据类型不一致,会报以下错误。
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`name`' given input columns: [_1, _2, _3];
3.DataSet->DataFrame
可以通过.toDF()方法
val spark = SparkSession.builder().appName("").master("local").getOrCreate()
import spark.implicits._
val rdd_ds = spark.sparkContext.parallelize(List(1,2,3)).toDS()
val rdd_df = rdd_ds.toDF()
rdd_df.show()
4.DataSet->RDD
DataSet.rdd
5.DataFrame->RDD
DataFrame.rdd
本文标签: Spark中 DataFrame DataSet rdd
版权声明:本文标题:Spark中 DataFrame,DataSet,RDD 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.freenas.com.cn/jishu/1730954591h1413388.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论