admin 管理员组

文章数量: 887021

Spark中 DataFrame,DataSet,RDD

(1)DataSet和RDD

RDD :仅表示数据集,RDD 没有元数据,也就是说没有字段语义定义

大数据的框架许多都要把内存中的数据往磁盘里写,所以DataSet取代rdd和dataframe。
因为,现阶段底层序列化机制使用的是java的或者Kryo的形式。
但是,java序列化出来的数据很大,影响存储Kryo对于小数据量的处理很好,
但是数据量一大,又会出现问题,所以官方的解决方法是使用自定义的编码器(Encoder)去序列化.
​DataSetRow :运行时类型检查,比如 salary 是字符串类型,下面语句也只有运行时才进行类型检查。

(2)DataSet和DataFrame


 DataSet跟DataFrame还是有挺大区别的,DataFrame开发都是写sql,
 但是DataSet是使用类似RDD的API。所以可以理解成DataSet就是存了个数据类型的RDD

​DataFrame:
由于 RDD 的局限性,Spark 产生了 DataFrame,DataFrame=RDD+Schema,Schema 是就是元数据,是语义描述信息。
DataFrame 是一种特殊类型的 Dataset,DataSet[Row] = DataFrame
DataFrame 自带优化器 Catalyst,可以自动优化程序
DataFrame 提供了一整套的 Data Source API

DataSet\DataFrame\RDD的区别:


(1)相同点:
          都是分布式数据集
          DataFrame底层是RDD,但是DataSet不是,不过他们最后都是转换成RDD运行
          DataSet和DataFrame的相同点都是有数据特征、数据类型的分布式数据集(schema)
(2)不同点:
          (a)schema信息:
                RDD中的数据是没有数据类型的
                DataFrame中的数据是弱数据类型,不会做数据类型检查
                    虽然有schema规定了数据类型,但是编译时是不会报错的,运行时才会报错
                DataSet中的数据类型是强数据类型
          (b)序列化机制:
                RDD和DataFrame默认的序列化机制是java的序列化,可以修改为Kyro的机制
                DataSet使用自定义的数据编码器进行序列化和反序列化


​三者的共性:


RDD,DataFrame,Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供遍历
三者都有惰性机制,在进行Transformation时不会执行,只有在遇到Action 如foreach时,三者才会开始遍历运算
三者都会根据spark的内存情况自动缓存运算,不需要担心内存溢出
三者都有partition,和许多共同函数,如filter,排序等
对DataFrame和Dataset进行操作,需要操作都需要这个包进行支持


Spark中所有功能的入口点是SparkSession类。要创建一个基本的SparkSession,只需使用sparkssession .builder():

DataFrame(DF):在Spark1.3之前版本中,用户使用Spark SQL时需要直接操作RDD API,学习成本相对较高,
代码结构相对复杂,为了提高任务执行性能,用户还需要掌握一些调优手段,Spark从1.3版本引入DataFrame,
DataFrame是一种带有Schema元信息的分布式数据集,类似于传统数据库中的二维表,定义有字段名称和类型,
用户可以像操作数据库表一样使用DataFrame,DataFrame的开发API简洁高效,代码结构清晰,并且Spark针对
DataFrame的操作进行了丰富的优化。DataFrame支持Java、Scala、Pythont等多种开发语言,不论是专业的开发
人员,还是数据分析人员都可以轻松地使用DataFrame处理结构化数据。

>RDD转化DataFrame2种方式:
 1.RDD->DataFrame
通过spark.createDataFrame方法将现有RDD转化为DataFrame

val spark = SparkSession.builder().appName("").Master("local").getOrCreate()
val rdd = spark.SparkContext.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2)))
val rdd1 =spark.createDataFrame(rdd)

+------+---+---+
|    _1| _2| _3|
+------+---+---+
|  aene| 22| NY|
|   joe| 39| CO|
|alison| 35| NY|

可以用_1,_2表示列 如:rdd1.select("_1","_2").where("_1 like '%o%'").show
+------+---+
|    _1| _2|
+------+---+
|   joe| 39|
|alison| 35|
|   bob| 71|
+------+---+

补充:
通过定义Case Class,使用反射推断Schema(Case Class方式)
case class Person(name:String,age:Int,address:String)

val spark =SparkSession.builder().master("local").appName("").getOrCreate()
 /*spark.sparkContext.setLogLevel("ERROR")
 import spark.implicits._
  val rdd = spark.sparkContext.textFile("file:///H:/users.txt").
      map(_.split(" ")).
      map(x=>Person(x(0),x(1).toInt,x(2))).toDF()*/

      下面同理:
    val rdd=sc.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(new Person(x(0),x(1).toInt,x(2))))
    val rdd_df = spark.createDataFrame(rdd)
这两种写法就是toDF和CreateFrame()通过定义Case class 。

2.RDD->DataFrame

通过SparkSession的read()方法加载不同的数据源
val spark = SparkSession.builder().appName("").Master("local").getOrCreate()
spark.read.textFile("file:///H:/users.txt").toDF()


DataSet是一个特定域的强类型的不可变数据集,每个DataSet都有一个非类型化视图DataFrame
(DataFrame是DataSet[Row]的一种表示形式),DataFrame可以通过调用 as(Encoder)函数转换成DataSet,
而DataSet则可以调用toDf()函数转换成DataFrame,两者之间可以互相灵活转换.
操作DataSet可以像操作RDD一样使用各种转换(Transformation)算子并行操作,转换操作采用"惰性"执行方式
当调用Action算子时才会触发真正的计算执行.
>RDD转化为DataSet的2中方式


1.RDD->DataSet


使用toDS()方法对List()或者Seq()
val spark  =SparkSession.builder().appName("").Master("local").getOrCreate()
val rdd = List(1,2,3).toDS
rdd.show
+-----+
|value|
+-----+
|    1|
|    2|
|    3|
+-----+


2.RDD->DataSet


使用createDataSet方法实现
val spark = SparkSession.builder().master("local").appName("").getOrCreate()
val sc = spark.sparkContext
 sc.setLogLevel("ERROR")
 val rdd = sc.textFile("file:///H:/users.txt")
  import spark.implicits._
  val rdd_ds  =spark.createDataset(rdd)
  rdd_ds.show()
+-------------+
|        value|
+-------------+
|  aene 22 NY |
|   joe 39 CO |
|alison 35 NY |
|  mike 69 VA |
|  marie 27 OR|
|    jim 21 OR|

3.DataFrame与DataSet相互转化

需要导入隐式转换 import spark.implicts._
要转化为dataframe,就调用toDF
要转化为dataset,就调用toDS(dataframe无法直接转化为dataset)
要转化为rdd,就调用rdd

/*注意:
在构建dataset的时候,必须要给集合提供一个Encoder编码器,将对应的数据类型,转化为StructType元数据信息;
同时,为了能够使用这个Encoder,必须要导入sparksession中的隐式转换spark.implicts._二者缺一不可,否则异常
*/
********************************************************************** ????????????????????????
1.DataFrame->DataSet()
可以通过.createDataSet()方法将DataFrame转化为DataSet
********************************************************************** ????????????????????????

2.DataFrame->DataSet()


可以通过df.as[Person]

将DataFrame转换成DataSet,不过要求是DataFrame的数据类型必须是case class,
并且要求DataFrame的数据类型必须和case class一致(顺序也必须一致)

case class Person(name:String,age:String,address:String)
val spark = SparkSession.builder().appName("").master("local").getOrCreate()
  import spark.implicits._
  val rdd_df = spark.sparkContext.textFile("file:///H:/users.txt").map(_.split(" ")).map(x=>(x(0),x(1),x(2))).
    toDF("name":String,"age":String,"address":String)
  val rdd_ds =rdd_df.as[Person]
    rdd_ds.show()
+------+---+-------+
|  name|age|address|
+------+---+-------+
|  aene| 22|     NY|
|   joe| 39|     CO|
|alison| 35|     NY|
|  mike| 69|     VA|
| marie| 27|     OR|
|   jim| 21|     OR|
|   bob| 71|     CA|

注意:如果DataFrame数据类型不一致,会报以下错误。
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`name`' given input columns: [_1, _2, _3];

3.DataSet->DataFrame


可以通过.toDF()方法
    val spark = SparkSession.builder().appName("").master("local").getOrCreate()
      import spark.implicits._
       val rdd_ds = spark.sparkContext.parallelize(List(1,2,3)).toDS()
       val rdd_df = rdd_ds.toDF()
        rdd_df.show()

4.DataSet->RDD
DataSet.rdd
5.DataFrame->RDD
DataFrame.rdd

本文标签: Spark中 DataFrame DataSet rdd