SparkSqlJoin操作类型
Spark DataFrame⽀持所有基本SQL Join类型的操作,如INNER,RIGHT OUTER,LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. Spark SQL Join操作是宽转换操作,结果数据会重组在⽹络中,因此当不仔细设计时,会有⾮常⼤的性能问题.
另外⼀⽅⾯,Spark SQL Join操作默认带更多优化(多亏DataFrame & DataSet), 虽然这样,当使⽤时需要考虑仍有⼀些性能问题.
在本⽂中,你会学到不同的Join语法并使⽤不同的Join 类型在DataFrame和DataSet上,例⼦⽤Scala.
SQL Join类型和语法
下边列出所有Spark SQL类型和语法
join(right: Dataset[_]): DataFrame
join(right: Dataset[_], usingColumn:String): DataFrame
join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType:String): DataFrame
join(right: Dataset[_], joinExprs: Column): DataFrame
join(right: Dataset[_], joinExprs: Column, joinType:String): DataFrame
剩下的教程解释6种语法 Join 类型,接收合适的DataFrame, Join表达式和Join类型⽤string.
对于语法4 & 5你可以使⽤"JoinType" 或者"Join 字符串"定义在上边的表⽤"JoinType"字符串参数.当你使⽤"JoinType",你应该import org.apache.spark.sql.catalyst.plans._ 作为定义JoinType对象.
JOINTYPE JOIN STRING EQUIVALENT SQL JOIN
Inner.sql inner INNER JOIN
FullOuter.sql outer, full, fullouter, full_outer FULL OUTER JOIN
LeftOuter.sql left, leftouter, left_outer LEFT JOIN
RightOuter.sql right, rightouter, right_outer RIGHT JOIN Cross.sql cross
LeftAnti.sql anti, leftanti, left_anti
LeftSemi.sql semi, leftsemi, left_semi
所有Join对象定义joinType对象为了使⽤你需要导⼊org.apache.spark.sql.catalyst.plans.{LeftOuter,Inner,....}.
在你使⽤Spark SQL join例⼦之前,⾸先,先创建emp 和dept DataFrame. 在这⾥emp_id 列在emp是唯⼀的, dept_id 在 dept数据集唯⼀,并且在emp 中 emp_dept_id 指向dep t数据集中dept_id.
val emp = Seq((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val empColumns = Seq("emp_id","name","superior_emp_id","year_joined",
"emp_dept_id","gender","salary")
import_
val empDF = DF(empColumns:_*)
empDF.show(false)
val dept = Seq(("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val deptColumns = Seq("dept_name","dept_id")
val deptDF = DF(deptColumns:_*)
deptDF.show(false)
Emp Dataset
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1|Smith  |-1            |2018|10|M    |3000|
|2|Rose    |1|2010|20|M    |4000|
|3|Williams|1|2010|10|M    |1000|
|4|Jones  |2|2005|10|F    |2000|
|5|Brown  |2|2010|40||-1    |
|6|Brown  |2|2010|50||-1    |
+------+--------+---------------+-----------+-----------+------+------+
Dept Dataset
+---------+-------+
|dept_name|dept_id|
+---------+-------+
|Finance  |10|
|Marketing|20|
|Sales    |30|
|IT      |40|
+---------+-------+
Inner Join
inner join为默认join,并且也是最常⽤的.被⽤来join两个DataFrame/Dataset 在指定列上,在两个数据集上没有匹配列上的数据会被丢弃.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"inner")
.show(false)
当应⽤inner join在数据集上时,会把emp 中emp_dept_id=50和 dept中dept_id=30的数据丢弃.下边是上边的输出结果
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1|Smith  |-1            |2018|10|M    |3000|Finance  |10|
|2|Rose    |1|2010|20|M    |4000|Marketing|20|
|3|Williams|1|2010|10|M    |1000|Finance  |10|
|4|Jones  |2|2005|10|F    |2000|Finance  |10|
|5|Brown  |2|2010|40||-1    |IT      |40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
full outer join
outer也叫full, fullouter join返回Spark DataFrame/Dataset中所有的⾏, join表达式没有匹配上的⽤null来表⽰对象列.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"outer").show(false)
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"full").show(false)
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"fullouter").show(false)
Left Outer Join
Spark left 同 left outer join返回所有在左边DataFrame/Dataset的所有列,忽略右边数据没有匹配上的数据,它被分配null.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"left")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"leftouter")
.show(false)
在empDF数据集emp_dept_id=50没有记录在dept中,因此数据集在dept列(dept_name & dept_id)为null,并且dept_id=30在dept中会被丢掉.下边是上边Join表达式的结果.
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1|Smith  |-1            |2018|10|M    |3000|Finance  |10|
|2|Rose    |1|2010|20|M    |4000|Marketing|20|
|3|Williams|1|2010|10|M    |1000|Finance  |10|
|4|Jones  |2|2005|10|F    |2000|Finance  |10|
|5|Brown  |2|2010|40||-1    |IT      |40|
|6|Brown  |2|2010|50||-1    |null    |null  |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
Scala
Right Outer Join
Spark Right 同 Right Outer join 相对于left join是另外⼀⽅向,会返回所有右侧DataFrame/Dataset的⾏,并且忽略匹配在左侧数据集当没有匹配上时,会分区null为那些数据,并且丢掉左侧没有匹配上的数据.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"right")
.show(false)
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"rightouter")
.show(false)
例⼦中,右侧数据集dep_id=30没有在左侧数据集emp中,因此这条记录包含null在emp列中.并且,emp_dept_id=50的没有匹配上,会被丢掉.下边是输出
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4|Jones  |2|2005|10|F    |2000|Finance  |10|
|3|Williams|1|2010|10|M    |1000|Finance  |10|
|1|Smith  |-1            |2018|10|M    |3000|Finance  |10|
|2|Rose    |1|2010|20|M    |4000|Marketing|20|
|null  |null    |null          |null      |null      |null  |null  |Sales    |30|
|5|Brown  |2|2010|40||-1    |IT      |40|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
Left Semi Join
Spark left Semi 和inner join相似,不同点leftsemi join返回在左数据集中所有列,并且忽略右数据集中所有列.换句话说,这个join返回的万仅仅是左数据匹配上右数据集的,没有匹配上的⽆论左右都被忽略.
相同的结果可以获得通过使⽤select在这个结果上,⽤inner join, 然⽽,使⽤这个join更⾼效.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"leftsemi")
.show(false)
输出
leftsemi join
+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1|Smith  |-1            |2018|10|M    |3000|
|2|Rose    |1|2010|20|M    |4000|
|3|Williams|1|2010|10|M    |1000|
|4|Jones  |2|2005|10|F    |2000|
|5|Brown  |2|2010|40||-1    |
+------+--------+---------------+-----------+-----------+------+------+
Left Anti Join
left anti join 做的和spark leftsemi刚好相反, leftanti join 返回仅仅是左侧列没有匹配上的.
empDF.join(deptDF,empDF("emp_dept_id")===  deptDF("dept_id"),"leftanti")
.show(false)
输出
+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6|Brown|2|2010|50||-1    |
+------+-----+---------------+-----------+-----------+------+------+
Self Join
Spark join没有⾃联接join是不完整的.虽然没有⾃连接join类型可⽤,我们可以使⽤任何上边解释的join类型 join到DataFrame⾃⾝.下边为⼀个inner self join
empDF.as("emp1").join(empDF.as("emp2"),
col("emp1.superior_emp_id")=== col("p_id"),"inner")
.select(col("p_id"),col("emp1.name"),
col("p_id").as("superior_emp_id"),
col("emp2.name").as("superior_emp_name"))
.show(false)
我们可以为所有employees join emp数据集join⾃⼰来发现superior emp_id和名字
+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2|Rose    |1|Smith            |
|3|Williams|1|Smith            |
|4|Jones  |2|Rose            |
|5|Brown  |2|Rose            |
|6|Brown  |2|Rose            |
+------+--------+---------------+-----------------+
使⽤SQL表达式
因为Spark SQL⽀持SQL原⽣语法,我们可以写join操作在创建⼀个临时表之后,并使⽤spark.sql()
源代码
empDF .createOrReplaceTempView ("EMP")
deptDF .createOrReplaceTempView ("DEPT")
//SQL JOIN
val  joinDF = spark .sql ("select * from EMP e, DEPT d p_dept_id == d.dept_id")
joinDF .show (false )
val  joinDF2 = spark .sql ("select * from EMP e INNER JOIN DEPT d p_dept_id == d.dept_id")  joinDF2.show (false )
package
import  SparkSession import
object  JoinExample extends  App {
val  spark : SparkSession = SparkSession .builder ()
.master ("local[1]")
.appName ("SparkByExamples")
.getOrCreate ()
spark .sparkContext .setLogLevel ("ERROR")
val  emp = Seq ((1,"Smith",-1,"2018","10","M",3000),
(2,"Rose",1,"2010","20","M",4000),
(3,"Williams",1,"2010","10","M",1000),
(4,"Jones",2,"2005","10","F",2000),
(5,"Brown",2,"2010","40","",-1),
(6,"Brown",2,"2010","50","",-1)
)
val  empColumns = Seq ("emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary")  import
_  val  empDF = emp .toDF (empColumns :_*)  empDF .show (false )
多表left join
val  dept = Seq (("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
)
val  deptColumns = Seq ("dept_name","dept_id")
val  deptDF = dept .toDF (deptColumns :_*)
deptDF .show (false )  println ("Inner join")
empDF .join (deptDF ,empDF ("emp_dept_id") ===  deptDF ("dept_id"),"inner")
.show (false )
println ("Outer join")
empDF .join (deptDF ,empDF ("emp_dept_id") ===  deptDF ("dept_id"),"outer")
.
show (false )
println ("full join")
empDF .join (deptDF ,empDF ("emp_dept_id") ===  deptDF ("dept_id"),"full")
.show (false )
println ("fullouter join")
empDF .join (deptDF ,empDF ("emp_dept_id") ===  deptDF ("dept_id"),"fullouter")
.show (false )
println ("right join")
empDF .join (deptDF ,empDF ("emp_dept_id") ===  deptDF ("dept_id"),"right")
.show (false )

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。