理解SparkSQL(⼆)——SQLContext和HiveContext
使⽤Spark SQL,除了使⽤之前介绍的⽅法,实际上还可以使⽤SQLContext或者HiveContext通过编程的⽅式实现。前者⽀持SQL语法解析器(SQL-92语法),后者⽀持SQL语
法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,⽤户可以通过配置切换成SQL语法解析器来运⾏HiveQL不⽀持的语法,如:select 1。实际上HiveContext是SQLContext的⼦类,因此在HiveContext运⾏过程中除了override的函数和变量,可以使⽤和SQLContext⼀样的函数和变量。
因为spark-shell⼯具实际就是运⾏的scala程序⽚段,为了⽅便,下⾯采⽤spark-shell进⾏演⽰。
⾸先来看SQLContext,因为是标准SQL,可以不依赖于Hive的metastore,⽐如下⾯的例⼦(没有启动hive metastore):
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn --conf spark.sql.catalogImplementation=in-memory
scala> case class offices(office:Int,city:String,region:String,mgr:Int,target:Double,sales:Double)
defined class offices
scala> val
File("/user/hive/warehouse/orderdb.db/").map(_.split("\t")).map(p=>offices(p(0).Int,p(1),p(2),p(3).Int,p(4).Double,p(5).Double)) rddOffices: org.apache.spark.rdd.RDD[offices] = MapPartitionsRDD[3] at map at <console>:26
scala> val officesDataFrame = ateDataFrame(rddOffices)
officesDataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]
scala> ateOrReplaceTempView("offices")
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
scala>
执⾏上⾯的命令后,实际上在yarn集中启动了⼀个yarn client模式的Spark Application,然后在scala>提⽰符后输⼊的语句会⽣成RDD的transformation,最后⼀条命令中的
collect会⽣成RDD的action,即会触发Job的提交和程序的执⾏。
命令⾏中之所以加上--conf spark.sql.catalogImplementation=in-memory选项,是因为spark-shell中的默认启动的SparkSession对象spark是默认⽀持Hive的,不带这个选项启动
的话,程序就会去连接hive metastore,因为这⾥并没有启动hive metastore,因此程序在执⾏createDataFrame函数时会报错。
程序中的第⼀⾏是1个case class语句,这⾥是定义后⾯的数据⽂件的模式的(定义模式除了这个⽅法,其实还有另外⼀种⽅法,后⾯再介绍)。第⼆⾏从hdfs中读取⼀个⽂本⽂
件,并⼯通过map映射到了模式上⾯。第三⾏基于第⼆⾏的RDD⽣成DataFrame,第四⾏基于第三⾏的DataFrame注册了⼀个逻辑上的临时表,最后⼀⾏就可以通过SparkSession的sql函数来执⾏sql语句了。
实际上,SQLContext是Spark 1.x中的SQL⼊⼝,在Spark 2.x中,使⽤SparkSession作为SQL的⼊⼝,但是为了向后兼容,Spark 2.x仍然⽀持SQLContext来操作SQL,不过会
提⽰deprecated,所以上⾯的例⼦是采⽤Spark 2.x中的写法。
实际上还有另外⼀种⽅法来操作SQL,针对同样的数据,例如:
scala> import org.apache.spark.sql._
import org.apache.spark.sql._
scala> import org.apache.pes._
import org.apache.pes._
scala> val schema = new StructType(Array(StructField("office", IntegerType, false), StructField("city", StringType, false), StructField("region", StringType, false),
StructField("mgr", IntegerType, true), StructField("target", DoubleType, true), StructField("sales", DoubleType, false)))
scala不是内部或外部命令
schema: org.apache.pes.StructType = StructType(StructField(office,IntegerType,false), StructField(city,StringType,false), StructField(region,StringType,false), StructField(mgr,IntegerType,tr
ue), StructField(target,DoubleType,true), StructField(sales,DoubleType,false))
scala> val rowRDD = sc.textFile("/user/hive/warehouse/orderdb.db/").map(_.split("\t")).map(p =>
Row(p(0).Int,p(1),p(2),p(3).Int,p(4).Double,p(5).Double))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[3] at map at <console>:30
scala> val dataFrame = ateDataFrame(rowRDD, schema)
dataFrame: org.apache.spark.sql.DataFrame = [office: int, city: string ... 4 more fields]
scala> ateOrReplaceTempView("offices")
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
这个例⼦与之前的例⼦有⼀些不同,主要的地⽅有3个:
1. 之前的例⼦是采⽤case class定义模式,Spark采⽤反射来推断Schema;⽽这个例⼦采⽤StructType类型的对象来定义模式,它接收⼀个数组,数组成员是StructField对象,
代表⼀个字段的定义,每个字段的定义由字段名称、字段类型和是否允许为空组成;
2. 对于代表数据的RDD,之前的例⼦是直接⽤case class定义的类型来分割字段,⽽这个例⼦是⽤的Row类型;
3. 在使⽤createDataFrame函数⽣成DataFrame时,该函数的参数不⼀样,之前的例⼦只要传⼊RDD对象即可(对象中隐含了模式),⽽这个例⼦需要同时传⼊RDD和定义的schema;
实际编程中建议采⽤第⼆种⽅法,因为其更加灵活,schema信息可以不必是写死的,⽽是可以在程序运⾏的过程中⽣成。
下⾯接着来看HiveContext的⽤法,使⽤HiveContext之前需要确保:
使⽤的Spark是⽀持Hive的;
Hive的配置⽂件l已经在Spark的conf⽬录下;
hive metastore已经启动;
举例说明:
⾸先启动hive metastore:
[root@BruceCentOS ~]# nohup hive --service metastore &
然后仍然通过spark-shell来举例说明,启动spark-shell,如下所⽰:
[root@BruceCentOS4 ~]# $SPARK_HOME/bin/spark-shell --master yarn
scala> spark.sql("show databases").collect.foreach(println)
[default]
[orderdb]
scala> spark.sql("use orderdb")
res2: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show tables").collect.foreach(println)
[orderdb,customers,false]
[orderdb,offices,false]
[orderdb,orders,false]
[orderdb,products,false]
[orderdb,salesreps,false]
scala> spark.sql("select city from offices where region='Eastern'").map(t=>"City: " + t(0)).collect.foreach(println)
City: NewYork
City: Chicago
City: Atlanta
scala>
可以看到这次启动spark-shell没有带上最后那个选项,这是因为这⾥我们打算⽤HiveContext来操作Hive中的数据,需要⽀持Hive。前⾯说过spark-shell是默认开启了Hive⽀持的。同SQLContext类似,Spark 2.x中也不需要再⽤HiveContext对象来操作SQL了,直接⽤SparkSession对象来操作就好了。可以看到这⾥可以直接操作表,不⽤再定义schema,这是因为schema是由外部的hive metastore定义的,spark通过连接到hive metastore来读取表的schema信息,因此这⾥能直接操作SQL。
另外,除了上⾯的使⽤SQLContext操作普通⽂件(需要额外定义模式)和使⽤HiveContext操作Hive表数据(需要开启hive metastore)之外,SQLContext还能操作JSON、PARQUET等⽂件,由于这两种数据⽂件⾃⼰带了模式信息,因此可以直接基于⽂件创建DataFrame,例如:
scala> val df = ad.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select name,age from people where age>19").map(t=>"Name :" + t(0) + ", Age: " + t(1)).collect.foreach(println)
Name :Andy, Age: 30
最后来看下DataFrame的另⼀种叫做DSL(Domain Specific Language)的⽤法。
scala> val df = ad.json("file:///opt/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age|  name|
+----+-------+
|null|Michael|
|  30|  Andy|
|  19| Justin|
+----+-------+
scala> df.select("name").show()
+-------+
|  name|
+-------+
|Michael|
|  Andy|
| Justin|
+-------+
scala> df.select(df("name"), df("age") + 1).show()
+-------+---------+
|  name|(age + 1)|
+-------+---------+
|Michael|    null|
|  Andy|      31|
| Justin|      20|
+-------+---------+
scala> df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
scala>
以上是对Spark SQL的SQLContext和HiveContext基本⽤法的⼀些总结,都是采⽤spark-shell⼯具举的例⼦。实际上由于spark-shell是运⾏scala程序⽚段的⼯具,上述例⼦完全可以改成独⽴的应⽤程序。我将在下⼀篇博⽂当中尝试使⽤Scala、Java和Python来编写独⽴的程序来操作上⾯的⽰例hive数据库orderdb,可以适当使⽤⼀些较为复杂的SQL来统计分析数据。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。