Spark读写CSV的常⽤配置项
Spark 2.0 之前,Spark SQL 读写 CSV 格式⽂件,需要 Databricks 官⽅提供的 spark-csv 库。在 Spark 2.0 之后,Spark SQL 原⽣⽀持读写 CSV 格式⽂件。
测试带标题的⽂件如下:
id|name|age
1| darren |18
2|anne|18
3|"test"|18
4|'test2'|18
package com.darren.spark.sql.csv
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = ad.format("csv")
.
option("delimiter", "|")
.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.option("inferSchema", "true")
.load("test-in/csv/csv_with_header.csv")
result.show()
result.printSchema()
}
}
输出结果如下:
+---+--------+----+
| id|    name| age|
+---+--------+----+
|  1| darren |  18|
|  2|    anne|  18|
|  3|  "test"|  18|
|  4|  test2|null|
+---+--------+----+
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
参数说明:
delimiter 分隔符,默认为逗号,
nullValue 指定⼀个字符串代表 null 值
quote 引号字符,默认为双引号"
header 第⼀⾏不作为数据内容,作为标题
inferSchema ⾃动推测字段类型
⾃动推测字段类型只是折中⽅案,更好的⽅案是指定字段类型:
package com.darren.spark.sql.csv
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
case class User(id: Int, name: String, age: Int)
测试不带标题的⽂件:
1| darren |18
2|anne|18
3|"test"|18
4|'test2'|\N
package com.darren.spark.sql.csv
import org.apache.spark.sql.{SaveMode, SparkSession}
/
**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = ad.format("csv")
.option("delimiter", "|")
//.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.option("inferSchema", "true")
.load("test-in/csv/csv_without_header.csv")
.toDF("id", "name", "age")
result.show()
result.printSchema()
}
}
| id|    name| age|
+---+--------+----+
|  1| darren |  18|
|  2|    anne|  18|
|  3|  "test"|  18|
|  4|  test2|null|
+---+--------+----+
session如何设置和读取root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
指定类型:
package com.darren.spark.sql.csv
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.pes.StructType
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* @Author Darren Zhang
* @Date 2019-05-30
* @Description TODO
**/
object CSVReader {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV Reader")
.master("local")
.getOrCreate()
val result = ad.format("csv")
.option("delimiter", "|")
//.option("header", "true")
.option("quote", "'")
.option("nullValue", "\\N")
.
option("inferSchema", "true")
.schema(ScalaReflection.schemaFor[User].dataType.asInstanceOf[StructType])      .load("test-in/csv/csv_without_header.csv")
//.toDF("id", "name", "age")
result.show()
result.printSchema()
}
}
结果和上边的结果⼀样
写CSV⽂件:
.mode(SaveMode.Overwrite)
.option("delimiter", "|")
// .option("quote", "")
.format("csv")
.save("test-out/csv/")
1|darren|18
2|anne|18
3|"\"test\""|18
4|test2|
⽂件的内容和读的输出有⼀些变化
第⼀个变化:写出的⽂件会增加双引号\",会在有引号的地⽅再增加引号,因为双引号是默认值,如果不想增加,就把注释打开,设置引号为空即可
第⼆个变化:darren前后的空格没有了。在spark 2.1.1 使⽤ Spark SQL 保存 CSV 格式⽂件,默认情况下,会⾃动裁剪字符串前后空格。
这样的默认⾏为有时候并不是我们所期望的,在 Spark 2.2.0 之后,可以通过配置关闭改功能:
result.write
.mode(SaveMode.Overwrite)
.option("delimiter", "|")
// .option("quote", "")
.option("ignoreLeadingWhiteSpace", false)
.option("ignoreTrailingWhiteSpace", false)
.option("nullValue", null)
.format("csv")
.save("test-out/csv/")
参数说明:
ignoreLeadingWhiteSpace 裁剪前⾯的空格
ignoreTrailingWhiteSpace 裁剪后⾯的空格
nullValue 空值设置,如果不想⽤任何符号作为空值,可以赋值null即可
问题:那么spark读写CSV到底有多少个属性可以设置呢?
答案:没有到有资料显⽰有多少个,但是到了源码,可以判断有多少个。
源码如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    /licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/
package org.apache.ution.datasources.csv
import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling} import org.apachemons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
class CSVOptions(
@transient private val parameters: CaseInsensitiveMap[String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
def this(
parameters: Map[String, String],
defaultTimeZoneId: String,
defaultColumnNameOfCorruptRecord: String = "") = {
this(
CaseInsensitiveMap(parameters),
defaultTimeZoneId,
defaultColumnNameOfCorruptRecord)
}
private def getChar(paramName: String, default: Char): Char = {
val paramValue = (paramName)
paramValue match {
case None => default
case Some(null) => default
case Some(value) if value.length == 0 => '\u0000'
case Some(value) if value.length == 1 => value.charAt(0)
case _ => throw new RuntimeException(s"$paramName cannot be more than one character")    }
}
private def getInt(paramName: String, default: Int): Int = {
val paramValue = (paramName)
paramValue match {
case None => default
case Some(null) => default
case Some(value) => try {
} catch {
case e: NumberFormatException =>
throw new RuntimeException(s"$paramName should be an integer. Found $value")
}
}
}
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = OrElse(paramName, String)
if (param == null) {
default
} else if (LowerCase(Locale.ROOT) == "true") {
true
} else if (LowerCase(Locale.ROOT) == "false") {
false
} else {
throw new Exception(s"$paramName flag can be true or false")
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。