DataFrame简介
DataFrame是一个分布式数据组织成命名列的集合。概念上相当于一个表在一个关系数据库。DataFrames可以由一系列广泛的来源,例如:结构化数据文件,hive,外部数据库,或现有的RDD。
在Spark中,DataFrame(SchemaRDD)是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。读取数据和执行查询都会返回DataFrame。
DataFrame是一个由Row对象组成的RDD,附带包含每列数据类型的结构信息。
DataFrame方法概述
1.提供类SQL的函数,如select、group、count、filter等,让用户方便对数据进行操作。Spark SQL会将这些操作转化成RDD的操作,在Spark Core引擎上执行。
2.提供SQL方法,可以创建临时或者全局的数据表,直接写sql语句对数据进行操作。
3.提供与JDBC交互的方法,例如从mysql表读取数据,将结果写入mysql。
4.提供与Hive表交互的方法,可以从Hive表读取数据,将结果写入Hive表。
DataFrame的特点
通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点.DataFrame不是类型安全的, API也不是面向对象风格的.
DataFrame的创建
读取外部数据集
csv,json,text,parquet(默认)123456val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()// 方式一val dfJson: DataFrame = spark.read.json("src/data/resources/employees.json")// 方式二val dfJson2: DataFrame = spark.read.format("json").load("src/data/resources/employees.json")dfJson.show()createDataFrame方法
123456789101112131415161718192021222324252627282930// 自定义类型case class Users(name:String,age:Int)val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()val sc = spark.sparkContext// 1.将一个自定义类型的Seq集合转化为DataFrame对象// 注:这里的自定义类型Users的字段不能用BigInt,要用Intval seq: Seq[Users] = Seq(Users("tom",12))val df2: DataFrame = spark.createDataFrame(seq)df2.show()println("==================")// 2.将一个自定义类型的RDD转化为DataFrame对象val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))val df3: DataFrame = spark.createDataFrame(rdd)df3.show()println("==================")// 3.将一个rdd结构化转变为DataFrame对象// 结构化信息val seqRow: Seq[Row] = Seq(Row(1,"lousen"),Row(2,"Mike"))val rddRow: RDD[Row] = sc.parallelize(seqRow)val schema: StructType = StructType(Array(StructField("id",IntegerType,false),StructField("name",StringType,true)))val df4: DataFrame = spark.createDataFrame(rddRow,schema)df4.show()println("==================")// 打印结构信息df4.printSchema()toDF(colNames:String…)方法
123456789101112131415// 自定义类型case class Users(name:String,age:Int)val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()val sc = spark.sparkContextval seq: Seq[Users] = Seq(Users("tom",12))val rdd: RDD[Users] = sc.parallelize(Seq(Users("jerry",20)))// 将一个自定义类型的rdd/Seq集合转化为DataFrame对象val df33 = rdd.toDF().show()val df333 = seq.toDF().show()// 指定列名// val df3333 = seq.toDF("name","age").show()println("------------toDF-------------")从外部数据库中
123456789val spark = SparkSession.builder().master("local[*]").appName("dataframe").getOrCreate()// 从数据库读取数据val url = "jdbc:mysql://localhost:3306/hive?user=root&password=113549743"val tablename = "student"val prop = new java.util.Properties()prop.setProperty("driver","com.mysql.jdbc.Driver")val jdbcDF = spark.read.jdbc(url,tablename,prop)println(jdbcDF.count())jdbcDF.show()从外部hive表中
DataFrame的类sql方法
|
|
注:DataFrame也可以使用RDD的一些方法,如map、filter、reduce等。
注:但因为直接基于无类型的Row操作不方便,一般如果要使用这些方法,先转换成 强类型的Dataset(如Dataset
DataFrame的sql查询
可以创建临时表或者全局表,采用SQL语句操作数据。
最后更新: 2018年11月27日 11:17
原始链接: https://www.lousenjay.top/2018/10/21/Spark入门详解(五)-DataFrame编程/