⽤Spark处理复杂数据类型(Struct、Array、Map、JSON字符
串等)
处理复杂的数据类型
这⾥是从我个⼈翻译的《Spark 权威指南》第六章摘录的⼀部分,但我觉得书中这块讲的程度还不够,额外补充了⼀些
当然,更多内容可参见本系列《Spark The Definitive Guide Learning》(Spark 权威指南)学习翻译笔记,收录于此:
处理 Structs 的⽅法
这种数据结构同C语⾔的结构体,内部可以包含不同类型的数据。还是⽤上⾯的数据,先创建⼀个包含struct的DataFrame
scala> val complexDF = df.selectExpr("struct(Description,InvoiceNo) as complex","Description","InvoiceNo")
scala> complexDF.printSchema
root
|-- complex: struct (nullable = false)
| |-- Description: string (nullable = true)
| |-- InvoiceNo: string (nullable = true)
|-- Description: string (nullable = true)
|-- InvoiceNo: string (nullable = true)
复制代码
包含复杂数据类型的complexDF和之前DataFrame都是⼀样使⽤的,区别在于如何取到结构体complex内地字段数据,有如下⼏种⽅法:
complexDF.select(col("complex").getField("Description")).show(5,false) # getField⽅法/getItem⽅法也OK,⼆者有区别的
complexDF.select("complex.Description").show(5,false) # 或者直接dot [`.`],全选的话是`.*`
# sql
spark.sql("select complex.* from complex_df").show(5,false)
spark.sql("select complex.Description from complex_df").show(5,false)
复制代码
处理 Arrays 的⽅法
如其名数组,和数组的性质之⼀⼀样内部只能包含同⼀类型的数据,先来创建⼀个包含Array类型的字段的DataFrame,书中这⾥提到了⼀个字符串的split⽅法,通过第⼆个正则参数将字符串分割,返回⼀个Array类型的Column
def split(str: Column, pattern: String): Column , Splits str around pattern (pattern is a regular expression).
# scala
scala> import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.split
# 将Description通过空格分割
scala> df.select(split(col("Description")," ")).printSchema
root
|-- split(Description, ): array (nullable = true)
| |-- element: string (containsNull = true)
scala> df.select(split(col("Description")," ")).show(2)
+---------------------+
|split(Description, )|
+---------------------+
| [WHITE, HANGING, ...|
| [WHITE, METAL, LA...|
+---------------------+
# SQL做法,SELECT split(Description, ' ') FROM dfTable
复制代码
Spark可以将这类复杂数据类型转为另⼀列,并可以通过⼀种类似Python操作数组的⽅式进⾏查询该数组
scala> df.select(split(col("Description")," ").alias("array_col")).select(expr("array_col[0]")).show(2)
+------------+
|array_col[0]|
+------------+
| WHITE|
| WHITE|
+------------+
# sql写法,SELECT split(Description, ' ')[0] FROM dfTable
# 当然还可以⽤getItem
scala> df.select(split(col("Description")," ").alias("array_col")).select(col("array_col").getItem(0)).show(2)
复制代码
获取数组的长度可以使⽤size⽅法(也适合于Map)
def size(e: Column): Column , Returns length of array or map.
scala> import org.apache.spark.sql.functions.size
import org.apache.spark.sql.functions.size
# 我这⾥Column是⽤$⽅式写的
scala> df.select(split($"Description", " ").alias("array_col")).withColumn("no_of_array",size($"array_col")).show(2,false)
+----------------------------------------+-----------+
|array_col |no_of_array|
+----------------------------------------+-----------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|5 |
|[WHITE, METAL, LANTERN] |3 |
+----------------------------------------+-----------+
复制代码
判断Array中是否包含某个元素可以⽤array_contains⽅法
def array_contains(column: Column, value: Any): Column , Returns null if the array is null, true if the array contains value, and false otherwise.
多⽤来做where条件的判断
scala> import org.apache.spark.sql.functions.array_contains
import org.apache.spark.sql.functions.array_contains
scala> df.select(split(col("Description"), " ").alias("array_col")).withColumn("contains_WHITE",array_contains($"array_col","WHITE")).show(5,false) +------------------------------------------+--------------+
|array_col |contains_WHITE|
+------------------------------------------+--------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER] |true |
|[WHITE, METAL, LANTERN] |true |
|[CREAM, CUPID, HEARTS, COAT, HANGER] |false |
|[KNITTED, UNION, FLAG, HOT, WATER, BOTTLE]|false |
|[RED, WOOLLY, HOTTIE, WHITE, HEART.] |true |
+------------------------------------------+--------------+
# sql中⼀样的
scala> val df1 = df.select(split(col("Description"), " ").alias("array_col"))
df1: org.apache.spark.sql.DataFrame = [array_col: array<string>]
scala> ateOrReplaceTempView("array_df")
scala> spark.sql("select *, array_contains(array_col,'WHITE') from array_df").show(5,false)
+------------------------------------------+--------------------------------+
|array_col |array_contains(array_col, WHITE)|
+------------------------------------------+--------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER] |true |
|[WHITE, METAL, LANTERN] |true |
|[CREAM, CUPID, HEARTS, COAT, HANGER] |false |
|[KNITTED, UNION, FLAG, HOT, WATER, BOTTLE]|false |
|[RED, WOOLLY, HOTTIE, WHITE, HEART.] |true |
+------------------------------------------+--------------------------------+
# 多还是⽤来作为where条件的判断,这⾥随便举个例⼦
val df2 = df.select(split(col("Description"), " ").alias("array_col")).withColumn("item",$"array_col".getItem(0))
# 第⼆个参数也能传Column,判断是否包含对应位置的元素
df2.where("array_contains(array_col,item)").show(2) # 这样写实际是expr
df2.where(array_contains($"array_col",$"item")).show(2)
复制代码
值得注意的是,SQL中Column的写法,不要带上引号,带了引号就看成String处理,写着容易忘
还可以使⽤explode⽅法将复杂的数据类型转为⼀组rows(就是Array/Map中每个元素展开对应其他列形成新列),如下图def explode(e: Column): Column, Creates a new row for each element in the given array or map column.
scala> import org.apache.spark.plode
scala> df.withColumn("splitted", split(col("Description"), " "))
.withColumn("exploded", explode(col("splitted")))
.select("Description", "InvoiceNo", "exploded").show(2)
+--------------------+---------+--------+
| Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE 536365| WHITE|
|WHITE 536365| HANGING|
+--------------------+---------+--------+
# 我这⾥写了个简单点的
scala> val df4 = Seq((Seq(1,1,2),2),(Seq(1,2,3),3)).toDF("item","id")
df4: org.apache.spark.sql.DataFrame = [item: array<int>, id: int]
scala> df4.printSchema
root
|-- item: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- id: integer (nullable = false)
scala> df4.show()
+---------+---+
| item| id|
+---------+---+
|[1, 1, 2]| 2|
|[1, 2, 3]| 3|
+---------+---+
# 就是展开了Array,然后对应其他列构成新的列
scala> df4.withColumn("exploded",explode($"item")).show
+---------+---+--------+
| item| id|exploded|
+---------+---+--------+
|[1, 1, 2]| 2| 1|
|[1, 1, 2]| 2| 1|
|[1, 1, 2]| 2| 2|
|[1, 2, 3]| 3| 1|
|[1, 2, 3]| 3| 2|
|[1, 2, 3]| 3| 3|
+---------+---+--------+
复制代码
补充下图⽚,可能说的不详细
explode_outer,同explode,但当array或map为空或null时,会展开为null arrays_overlap(a1,a2)
数组a1⾄少包含数组a2的⼀个⾮空元素,则返回true
任何数组包含null,则返回null
spark.sql("select arrays_overlap(array(1,2,3),array(3,4,5))").show
true
spark.sql("select arrays_overlap(array(1,2,3),array(4,5))").show
false
spark.sql("select arrays_overlap(array(1,2,3),array(4,5,null))").show
null
复制代码
arrays_zip(array<T>, array<U>, ...):array<struct<T, U, ...>>
合并n个Array为结构数组
第n个结构(struct)包含所有输⼊Array的第n个值,没有即为null
scala> val df = spark.sql("select arrays_zip(array(1,2,3),array('4','5')) as array_zip")
scala> df.printSchema
root
|-- array_zip: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- 0: integer (nullable = true)
| | |-- 1: string (nullable = true)
scala> df.select(col("array_zip").getItem(0)).show
+------------+
|array_zip[0]|
+------------+
| [1, 4]|
+------------+
复制代码
element_at(array<T>, Int):T和element_at(map<K, V>, K):V
也适合Map,返回key对应的value,不含key的话返回null
scala> spark.sql("select element_at(array(1,2,3),-1)").showc语言如何创建字符串数组
+------------------------------+
|element_at(array(1, 2, 3), -1)|
+------------------------------+
| 3|
+------------------------------+
scala> spark.sql("select element_at(array(1,2,3),4)").show
+-----------------------------+
|element_at(array(1, 2, 3), 4)|
+-----------------------------+
| null|
+-----------------------------+
scala> spark.sql("select element_at(array(1,2,3),0)").show
java.lang.ArrayIndexOutOfBoundsException: SQL array indices start at 1
复制代码
还有⼀些适⽤于Array的⽅法,不好截图,列在这⾥:
reverse(e: Column): Column,将字符串或者数组元素翻转
注意:像字符串"abc def"翻转过来是"fed cba"
flatten(array<array<T>>): array<T>,把嵌套数组转换为数组,但如果嵌套数组的结构层级超过2,也只是去掉⼀层嵌套
spark.sql("select flatten(array(array(1,2),array(3,4)))").show
[1, 2, 3, 4]
spark.sql("select flatten(array(array(array(1,2),array(3,4)),array(array(5,6))))").show(false)
[[1, 2], [3, 4], [5, 6]]
复制代码
shuffle(e: Column): Column,把数组随机打乱排列
slice(x: Column, start: Int, length: Int): Column,就是截取数组,类似python,但这⾥是把数组x从索引start开始截取length个元素的数组返回
如果start是负数,则从末尾开始向后截取,貌似没解释清,看⽰例
索引从1开始
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论