spark读取hive数据的两种⽅式
简述
spark读取hive数据的两种⽅式
⼀是通过访问hive metastore的⽅式,这种⽅式通过访问hive的metastore元数据的⽅式获取表结构信息和该表数据所存放的HDFS路径,这种⽅式的特点是效率⾼、数据吞吐量⼤、使⽤spark操作起来更加友好。
⼆是通过spark jdbc的⽅式访问,就是通过链接hiveserver2的⽅式获取数据,这种⽅式底层上跟spark链接其他rdbms上⼀样,可以采⽤sql的⽅式先在其数据库中查询出来结果再获取其结果数据,这样⼤部分数据计算的压⼒就放在了数据库上。
两种⽅式的实现
⽅式⼀:直接采⽤spark on hive的⽅式读取
这种⽅式只适⽤在服务器上提交spark-submit时读取本集hive中的数据,后⾯会写⼀篇spark任务读取不同集中的hive数据⽅法。
这种⽅式实现起来很简单,在构建SparkSession的时候设置
enableHiveSupport()
样例:
val spark = SparkSession.builder()
.appName("test")
.enableHiveSupport()
.getOrCreate()
这样你的SparkSession在使⽤sql的时候会去集hive中的库表,加载其hdfs数据与其元数据组成DataFrame
val df = spark.sql("select * from test.user_info")
⽅式⼆:采⽤spark jdbc的⽅式
这种⽅式并不是⼤数据的主流⽅法,并不是经常使⽤,能采⽤第⼀种⽅法最好,但是如果有特别的使⽤场景的话也可以通过这种⽅法来实现。
直接使⽤spark jdbc读取hive数据
val df = ad
.format("jdbc")
.option("driver","org.apache.hive.jdbc.HiveDriver")
.option("url","jdbc:hive2://xxx:10000/")
.option("user","hive")
.option("password","xxx")
.option("fetchsize", "2000")
.option("dbtable","test.user_info")
.load()
df.show(10)
会有⼀个现象,DataFrame中只有该表的表结构,并没有该表的真实数据。
虽然原理⼀样,但是hive与spark通过jdbc连接其他的rdbms还有点不同,在spark源码中可以看出来,并没有hive相关的dialect⽤来注册。
需要⼿动的加⼀点料
def register(): Unit = {
}
case object HiveSqlDialect extends JdbcDialect {
session如何设置和读取override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
override def quoteIdentifier(colName: String): String = {
colName.split('.').map(part => s"`$part`").mkString(".")
}
}
在使⽤spark jdbc之前调⽤register()⽅法⼿动注册即可
(注意:jdbc读取hive时需要加上.option("fetchsize", 每处理批次的条数),不然同样可能会出现不显⽰数据的问题)完整代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
object test{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("test")
.getOrCreate()
register()
val df = ad
.format("jdbc")
.option("driver","org.apache.hive.jdbc.HiveDriver")
.option("url","jdbc:hive2://xxx:10000/")
.option("user","hive")
.option("password",xxx)
.
option("fetchsize", "2000")
.option("dbtable","test.user_info")
.load()
df.show(10)
}
def register(): Unit = {
}
case object HiveSqlDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:hive2")
override def quoteIdentifier(colName: String): String = {
colName.split('.').map(part => s"`$part`").mkString(".")
}
}
}
欢迎留⾔讨论和指正

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。