DataFrame简介

DataFrame是一个分布式数据组织成命名列的集合。概念上相当于一个表在一个关系数据库。DataFrames可以由一系列广泛的来源,例如:结构化数据文件,hive,外部数据库,或现有的RDD。

在Spark中,DataFrame(SchemaRDD)是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。读取数据和执行查询都会返回DataFrame。

DataFrame是一个由Row对象组成的RDD,附带包含每列数据类型的结构信息。

DataFrame方法概述

1.提供类SQL的函数,如select、group、count、filter等,让用户方便对数据进行操作。Spark SQL会将这些操作转化成RDD的操作,在Spark Core引擎上执行。
2.提供SQL方法,可以创建临时或者全局的数据表,直接写sql语句对数据进行操作。
3.提供与JDBC交互的方法,例如从mysql表读取数据,将结果写入mysql。
4.提供与Hive表交互的方法,可以从Hive表读取数据,将结果写入Hive表。

DataFrame的特点

通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点.DataFrame不是类型安全的, API也不是面向对象风格的.

DataFrame的创建

  1. 读取外部数据集
    csv,json,text,parquet(默认)

    1
    2
    3
    4
    5
    6
    val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
    // 方式一
    val dfJson: DataFrame = spark.read.json("src/data/resources/employees.json")
    // 方式二
    val dfJson2: DataFrame = spark.read.format("json").load("src/data/resources/employees.json")
    dfJson.show()
  2. createDataFrame方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // 自定义类型
    case class Users(name:String,age:Int)
    val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
    val sc = spark.sparkContext
    // 1.将一个自定义类型的Seq集合转化为DataFrame对象
    // 注:这里的自定义类型Users的字段不能用BigInt,要用Int
    val seq: Seq[Users] = Seq(Users("tom",12))
    val df2: DataFrame = spark.createDataFrame(seq)
    df2.show()
    println("==================")
    // 2.将一个自定义类型的RDD转化为DataFrame对象
    val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))
    val df3: DataFrame = spark.createDataFrame(rdd)
    df3.show()
    println("==================")
    // 3.将一个rdd结构化转变为DataFrame对象
    // 结构化信息
    val seqRow: Seq[Row] = Seq(Row(1,"lousen"),Row(2,"Mike"))
    val rddRow: RDD[Row] = sc.parallelize(seqRow)
    val schema: StructType = StructType(Array(StructField("id",IntegerType,false),StructField("name",StringType,true)))
    val df4: DataFrame = spark.createDataFrame(rddRow,schema)
    df4.show()
    println("==================")
    // 打印结构信息
    df4.printSchema()
  3. toDF(colNames:String…)方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 自定义类型
    case class Users(name:String,age:Int)
    val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
    val sc = spark.sparkContext
    val seq: Seq[Users] = Seq(Users("tom",12))
    val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))
    // 将一个自定义类型的rdd/Seq集合转化为DataFrame对象
    val df33 = rdd.toDF().show()
    val df333 = seq.toDF().show()
    // 指定列名
    // val df3333 = seq.toDF("name","age").show()
    println("------------toDF-------------")
  4. 从外部数据库中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
    // 从数据库读取数据
    val url = "jdbc:mysql://localhost:3306/hive?user=root&password=113549743"
    val tablename = "student"
    val prop = new java.util.Properties()
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    val jdbcDF = spark.read.jdbc(url,tablename,prop)
    println(jdbcDF.count())
    jdbcDF.show()
  5. 从外部hive表中

DataFrame的类sql方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 自定义类型
case class Users(name:String,age:Int)
val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
val sc = spark.sparkContext
// 将一个自定义类型的RDD转化为DataFrame对象
val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))
val df3: DataFrame = spark.createDataFrame(rdd)
// 类sql方法
// 1. 查看name列的数据
df3.select("name").show()
// 2. 查看多个列的数据
df3.select("name","age").show()
// 3. 获取某列的值
df3.select($"name").show()
// 4. 修改某列的值,并重命名新列
df3.select($"age"+1).alias("newAge").show() // alias可以替换为as
// 5. 过滤,三种实现方式
df3.filter("age > 30").show()
df3.filter($"age" > 30).show()
df3.filter(row => row.getInt(1) > 30).show()
// 6. 分组
df3.groupBy("name").count().show()
// 7. 打印结构信息
df3.printSchema()
// 8. 删除列
df3.drop("age")
// 9. 根据指定列排序
df3.sort("age")
// 10. 输出前n条数据
df3.select("age").show(10)
// 11. 输出文件到指定路径
df3.write().json(path)
df3.write().csv(path)
// 保存模式 mode(保存模式的类型)
// 追加 SaveMode.Append
// 覆盖 SaveMode.Overwrite
// 存在忽略 SaveMode.Ignore
// 存在报错 SaveMode.ErrorIfExists 默认类型
df3.write().mode(SaveMode.Append).save(path)

注:DataFrame也可以使用RDD的一些方法,如map、filter、reduce等。
注:但因为直接基于无类型的Row操作不方便,一般如果要使用这些方法,先转换成 强类型的Dataset(如Dataset),再使用。

DataFrame的sql查询

可以创建临时表或者全局表,采用SQL语句操作数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 自定义类型
case class Users(name:String,age:Int)
val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()
val sc = spark.sparkContext
// 将一个自定义类型的RDD转化为DataFrame对象
val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))
val df3: DataFrame = spark.createDataFrame(rdd)
// 1. 创建临时视图
df3.createOrReplaceTempView("user")
// 使用创建的临时视图进行sql查询
spark.sql("select name,count(*) from user group by name").show()
// 2. 创建全局临时视图
df3.createOrReplaceGlobalTempView("user2")
// 全局临时视图与系统保存的数据库global_temp相关联,我们必须使用限定名称来引用它
// 相比于临时表,可以跨不同SparkSession共享同一张表
spark.sql("select * from global_temp.user2").show()
// 3. warehouse
df3.write.saveAsTable("user")