sparksql导出数据
如果⽤户希望在spark sql 中,执⾏某个sql 后,将其结果集保存到本地,并且指定csv 或者 json 格式,在 beeline 中,实现起来很⿇烦。通常的做法是将其create table tempTable as *** ,通过将结果集写⼊到新的临时表中,进⾏保存,然后再通过其他⽅式export 到本地。
scala python这种⽅式,对于 HDFS 是可⾏到,但是如果数据是保存在像SequoiaDB 中,就⽐较难办了。因为spark 向 SequoiaDB 写⼊记录时,可能部分task 会失败重试,这样就容易造成SequoiaDB ⽬标表中写⼊了重复记录,从⽽造成数据不准确的问题。
因此,需要寻⼀种的新的⽅式,将其结果集准确地读取出来,并且写⼊本地⽂件。
在⽹上有很多替代⽅案,⽆外乎是通过beeline 或者 spark-sql ,执⾏ SQL 命令,通过重定向的⽅式,将结果集保存到指定⽂件中。
这样的⽅式,⾸先不讨论其输出格式的问题,最⽆法让⼈接受的是,spark-sql 需要将所有的结果数据收集到⼀个 Driver 进程中后,才会开始输出终端。这个过程有以下 3 个问题
1. 时间久,如果数据量⼤了,Driver 收集的过程会很久,并且通过top 可以查看到进程CPU 飙升
2. 容易OOM,当数据量增⼤后,因为需要将所有结果数据存储在内存中,⼀旦数据量⽤超了,就抛出 OOM 的错误,⼀切前功尽弃
3. 输出格式,因为保存本地⽂件的内容就是输出终端的数据,CSV 格式不友好,有时候甚⾄会因为不可见字符⽽导致整个本地⽂件格式
错乱,最终导致数据⽆法恢复
所以本⽂主要是向读者们介绍⼀种新的⽅式,直接使⽤ scala / python 语⾔开发的程序,利⽤ RDD 将其结果数据保存本地,输出格式⽀持CSV 和 JSON。
scala 版本
scala 版本作者没有直接编写程序,但是通过 spark-shell 进⾏了验证
import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM test_sdb")
如果有⽤户喜欢这个⽅式,可以考虑将程序打包成jar 包来执⾏。
导出格式的更多参数,请参考 python 版本
python 版本
在执⾏python 的脚本前,⾸先需要设置⼀下环境变量
export SPARK_HOME=/root/software/spark-2.1.1-bin-hadoop2.7
export PYTHONPATH=${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip;
注意:py4j-0.10.4-src.zip ⽂件名可能随不同的spark 版本有所变化
然后准备以下脚本程序, spark_sql_export.py
import atexit
import os
import platform
import pyspark
t import SparkContext
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession \
.builder \
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM test_sdb limit 100")
#df.coalesce(1).write.format("org.apache.spark.sql.json").mode("overwrite") \
# .save("/opt/sequoiadb/chenfool")
.option("enforceSchema", "false") \
.option("quoteAll", "true") \
.option("escapeQuotes", "false") \
.option("header", "true") \
.option("delimiter", "|") \
.option("charToEscapeQuoteEscaping", "\"") \
.option("inferSchema", "true") \
.option("ignoreLeadingWhiteSpace", "false") \
.option("ignoreTrailingWhiteSpace", "false") \
.
save("/opt/sequoiadb/chenfool")
执⾏⽅式
python spark_sql_export.py
结果数据就会被保存在 /opt/sequoiadb/chenfool/part-00000* ⽂件中。
结果数据只会被保存在⼀个⽂件中,因为设置了 coalesce 参数。
JSON 格式请参考 spark_sql_export.py 注释部分。
CSV 的详细参数,可以参考spark 源码:
${SPARK_HOME}/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
注意:
在spark 2.1.1 版本中,ignoreLeadingWhiteSpace 和 ignoreTrailingWhiteSpace 参数⽆法⽣效,默认值为:true。在 spark 2.4.0 版本中,经过测试,这两个参数才能够⽣效。如果要求保存的数据中不做 trim 操作,只能够将spark 升级为2.4.0 版本。
本博客参考了之前的内容,⾥⾯有介绍如果利⽤python 来执⾏spark 的程序的说明,感兴趣的读者们可以移步查阅
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论