一、Spark SQL的概念理解
Spark SQL是spark套件中一個模板,它將數據的計算任務通過SQL的形式轉換成了RDD的計算,類似于Hive通過SQL的形式將數據的計算任務轉換成了MapReduce。
Spark SQL的特點:
和Spark Core的無縫集成,可以在寫整個RDD應用的時候,配置Spark SQL來完成邏輯實現。
統一的數據訪問方式,Spark SQL提供標準化的SQL查詢。
Hive的繼承,Spark SQL通過內嵌的hive或者連接外部已經部署好的hive案例,實現了對hive語法的繼承和操作。
標準化的連接方式,Spark SQL可以通過啟動thrift Server來支持JDBC、ODBC的訪問,將自己作為一個BI Server使用
Spark SQL數據抽象:
RDD(Spark1.0)-》DataFrame(Spark1.3)-》DataSet(Spark1.6)
Spark SQL提供了DataFrame和DataSet的數據抽象
DataFrame就是RDD+Schema,可以認為是一張二維表格,劣勢在于編譯器不進行表格中的字段的類型檢查,在運行期進行檢查
DataSet是Spark最新的數據抽象,Spark的發展會逐步將DataSet作為主要的數據抽象,弱化RDD和DataFrame.DataSet包含了DataFrame所有的優化機制。除此之外提供了以樣例類為Schema模型的強類型
DataFrame=DataSet[Row]
DataFrame和DataSet都有可控的內存管理機制,所有數據都保存在非堆上,都使用了catalyst進行SQL的優化。
Spark SQL客戶端查詢:
可以通過Spark-shell來操作Spark SQL,spark作為SparkSession的變量名,sc作為SparkContext的變量名
可以通過Spark提供的方法讀取json文件,將json文件轉換成DataFrame
可以通過DataFrame提供的API來操作DataFrame里面的數據。
可以通過將DataFrame注冊成為一個臨時表的方式,來通過Spark.sql方法運行標準的SQL語句來查詢。
二、Spark SQL查詢方式
DataFrame查詢方式
DataFrame支持兩種查詢方式:一種是DSL風格,另外一種是SQL風格
(1)、DSL風格:
需要引入import spark.implicit. _ 這個隱式轉換,可以將DataFrame隱式轉換成RDD
(2)、SQL風格:
a、需要將DataFrame注冊成一張表格,如果通過CreateTempView這種方式來創建,那么該表格Session有效,如果通過CreateGlobalTempView來創建,那么該表格跨Session有效,但是SQL語句訪問該表格的時候需要加上前綴global_temp
b、需要通過sparkSession.sql方法來運行你的SQL語句
DataSet查詢方式
定義一個DataSet,先定義一個Case類
三、DataFrame、Dataset和RDD互操作
RDD-》DataFrame
普通方式:例如rdd.map(para(para(0).trim(),para(1).trim().toInt)).toDF(“name”,“age”)
通過反射來設置schema,例如:
#通過反射設置schema,數據集是spark自帶的people.txt,路徑在下面的代碼中case class Person(name:String,age:Int)
val peopleDF=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”).map(_.split(“,”)).map(para=》Person(para(0).trim,para(1).trim.toInt)).toDF
peopleDF.show
#注冊成一張臨時表
peopleDF.createOrReplaceTempView(“persons”)
val teen=spark.sql(“select name,age from persons where age between 13 and 29”)
teen.show
這時teen是一張表,每一行是一個row對象,如果需要訪問Row對象中的每一個元素,可以通過下標 row(0);你也可以通過列名 row.getAs[String](“name”)
也可以使用getAs方法:
3、通過編程的方式來設置schema,適用于編譯器不能確定列的情況
val peopleRDD=spark.sparkContext.textFile(“file:///root/spark/spark2.4.1/examples/src/main/resources/people.txt”)
val schemaString=“name age”
val filed=schemaString.split(“ ”).map(filename=》 org.apache.spark.sql.types.StructField(filename,org.apache.spark.sql.types.StringType,nullable = true))
val schema=org.apache.spark.sql.types.StructType(filed)
peopleRDD.map(_.split(“,”)).map(para=》org.apache.spark.sql.Row(para(0).trim,para(1).trim))
val peopleDF=spark.createDataFrame(res6,schema)
peopleDF.show
DataFrame-》RDD
dataFrame.rdd
RDD-》DataSet
rdd.map(para=》 Person(para(0).trim(),para(1).trim().toInt)).toDS
DataSet-》DataSet
dataSet.rdd
DataFrame -》 DataSet
dataFrame.to[Person]
DataSet -》 DataFrame
dataSet.toDF
四、用戶自定義函數
用戶自定義UDF函數
通過spark.udf功能用戶可以自定義函數
自定義udf函數:
通過spark.udf.register(name,func)來注冊一個UDF函數,name是UDF調用時的標識符,fun是一個函數,用于處理字段。
需要將一個DF或者DS注冊為一個臨時表
通過spark.sql去運行一個SQL語句,在SQL語句中可以通過name(列名)方式來應用UDF函數
用戶自定義聚合函數
1. 弱類型用戶自定義聚合函數
新建一個Class 繼承UserDefinedAggregateFunction ,然后復寫方法:
//聚合函數需要輸入參數的數據類型
override def inputSchema: StructType = ???
//可以理解為保存聚合函數業務邏輯數據的一個數據結構
override def bufferSchema: StructType = ???
// 返回值的數據類型
override def dataType: DataType = ???
// 對于相同的輸入一直有相同的輸出
override def deterministic: Boolean = true
//用于初始化你的數據結構
override def initialize(buffer: MutableAggregationBuffer): Unit = ???
//用于同分區內Row對聚合函數的更新操作
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = ???
//用于不同分區對聚合結果的聚合。
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = ???
//計算最終結果
override def evaluate(buffer: Row): Any = ???
你需要通過spark.udf.resigter去注冊你的UDAF函數。
需要通過spark.sql去運行你的SQL語句,可以通過 select UDAF(列名) 來應用你的用戶自定義聚合函數。
2、強類型用戶自定義聚合函數
新建一個class,繼承Aggregator[Employee, Average, Double],其中Employee是在應用聚合函數的時候傳入的對象,Average是聚合函數在運行的時候內部需要的數據結構,Double是聚合函數最終需要輸出的類型。這些可以根據自己的業務需求去調整。復寫相對應的方法:
//用于定義一個聚合函數內部需要的數據結構
override def zero: Average = ???
//針對每個分區內部每一個輸入來更新你的數據結構
override def reduce(b: Average, a: Employee): Average = ???
//用于對于不同分區的結構進行聚合
override def merge(b1: Average, b2: Average): Average = ???
//計算輸出
override def finish(reduction: Average): Double = ???
//用于數據結構他的轉換
override def bufferEncoder: Encoder[Average] = ???
//用于最終結果的轉換
override def outputEncoder: Encoder[Double] = ???
新建一個UDAF實例,通過DF或者DS的DSL風格語法去應用。
五、Spark SQL和Hive的繼承
1、內置Hive
Spark內置有Hive,Spark2.1.1 內置的Hive是1.2.1。
需要將core-site.xml和hdfs-site.xml 拷貝到spark的conf目錄下。如果Spark路徑下發現metastore_db,需要刪除【僅第一次啟動的時候】。
在你第一次啟動創建metastore的時候,你需要指定spark.sql.warehouse.dir這個參數, 比如:bin/spark-shell --conf spark.sql.warehouse.dir=hdfs://master01:9000/spark_warehouse
注意,如果你在load數據的時候,需要將數據放到HDFS上。
2、外部Hive(這里主要使用這個方法)
需要將hive-site.xml 拷貝到spark的conf目錄下。
如果hive的metestore使用的是mysql數據庫,那么需要將mysql的jdbc驅動包放到spark的jars目錄下。
可以通過spark-sql或者spark-shell來進行sql的查詢。完成和hive的連接。
這就是hive里面的表
六、Spark SQL的數據源
1、輸入
對于Spark SQL的輸入需要使用sparkSession.read方法
通用模式 sparkSession.read.format(“json”).load(“path”) 支持類型:parquet、json、text、csv、orc、jdbc
專業模式 sparkSession.read.json、 csv 直接指定類型。
2、輸出
對于Spark SQL的輸出需要使用 sparkSession.write方法
通用模式 dataFrame.write.format(“json”).save(“path”) 支持類型:parquet、json、text、csv、orc
專業模式 dataFrame.write.csv(“path”) 直接指定類型
如果你使用通用模式,spark默認parquet是默認格式、sparkSession.read.load 加載的默認是parquet格式dataFrame.write.save也是默認保存成parquet格式。
如果需要保存成一個text文件,那么需要dataFrame里面只有一列(只需要一列即可)。
七、Spark SQL實戰
1、數據說明
這里有三個數據集,合起來大概有幾十萬條數據,是關于貨品交易的數據集。
2、任務
這里有三個需求:
計算所有訂單中每年的銷售單數、銷售總額
計算所有訂單每年最大金額訂單的銷售額
計算所有訂單中每年最暢銷貨品
3、步驟
1. 加載數據
tbStock.txt
#代碼case class tbStock(ordernumber:String,locationid:String,dateid:String) extends Serializable
val tbStockRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStock.txt”)
val tbStockDS=tbStockRdd.map(_.split(“,”)).map(attr=》tbStock(attr(0),attr(1),attr(2))).toDS
tbStockDS.show()
tbStockDetail.txt
case class tbStockDetail(ordernumber:String,rownum:Int,itemid:String,number:Int,price:Double,amount:Double) extends Serializable
val tbStockDetailRdd=spark.sparkContext.textFile(“file:///root/dataset/tbStockDetail.txt”)
val tbStockDetailDS=tbStockDetailRdd.map(_.split(“,”)).map(attr=》tbStockDetail(attr(0),attr(1).trim().toInt,attr(2),attr(3).trim().toInt,attr(4).trim().toDouble,attr(5).trim().toDouble)).toDS
tbStockDetailDS.show()
tbDate.txt
case class tbDate(dateid:String,years:Int,theyear:Int,month:Int,day:Int,weekday:Int,week:Int,quarter:Int,period:Int,halfmonth:Int) extends Serializable
val tbDateRdd=spark.sparkContext.textFile(“file:///root/dataset/tbDate.txt”)
val tbDateDS=tbDateRdd.map(_.split(“,”)).map(attr=》tbDate(attr(0),attr(1).trim().toInt,attr(2).trim().toInt,attr(3).trim().toInt,attr(4).trim().toInt,attr(5).trim().toInt,attr(6).trim().toInt,attr(7).trim().toInt,attr(8).trim().toInt,attr(9).trim().toInt)).toDS
tbDateDS.show()
2. 注冊表
tbStockDS.createOrReplaceTempView(“tbStock”)
tbDateDS.createOrReplaceTempView(“tbDate”)
tbStockDetailDS.createOrReplaceTempView(“tbStockDetail”)
3. 解析表
計算所有訂單中每年的銷售單數、銷售總額
#sql語句
select c.theyear,count(distinct a.ordernumber),sum(b.amount)
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear
order by c.theyear
計算所有訂單每年最大金額訂單的銷售額
a、先統計每年每個訂單的銷售額
select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber
b、計算最大金額訂單的銷售額
select d.theyear,c.SumOfAmount as SumOfAmount
from
(select a.dateid,a.ordernumber,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
group by a.dateid,a.ordernumber) c
join tbDate d on c.dateid=d.dateid
group by d.theyear
order by theyear desc
計算所有訂單中每年最暢銷貨品
a、求出每年每個貨品的銷售額
select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid
b、在a的基礎上,統計每年單個貨品的最大金額
select d.theyear,max(d.SumOfAmount) as MaxOfAmount
from
(select c.theyear,b.itemid,sum(b.amount) as SumOfAmount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by theyear
c、用最大銷售額和統計好的每個貨品的銷售額join,以及用年join,集合得到最暢銷貨品那一行信息
select distinct e.theyear,e.itemid,f.maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) e
join
(select d.theyear,max(d.sumofamount) as maxofamount
from
(select c.theyear,b.itemid,sum(b.amount) as sumofamount
from tbStock a
join tbStockDetail b on a.ordernumber=b.ordernumber
join tbDate c on a.dateid=c.dateid
group by c.theyear,b.itemid) d
group by d.theyear) f on e.theyear=f.theyear
and e.sumofamount=f.maxofamount order by e.theyear
編輯:jq
-
數據
+關注
關注
8文章
7122瀏覽量
89356 -
SQL
+關注
關注
1文章
772瀏覽量
44206 -
函數
+關注
關注
3文章
4344瀏覽量
62853 -
RDD
+關注
關注
0文章
7瀏覽量
7986
原文標題:Spark SQL 重點知識總結
文章出處:【微信號:DBDevs,微信公眾號:數據分析與開發】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論