spark读取和处理zip、gzip、excel、等各种⽂件最全的技巧总结
⼀、当后缀名为zip、gzip,spark可以⾃动处理和读取
1、spark⾮常智能,如果⼀批压缩的zip和gzip⽂件,并且⾥⾯为⼀堆text⽂件时,可以⽤如下⽅式读取或者获取读取后的schema
2、当压缩的⼀批text⽂件⾥⾯的内容为json时,还可以通过read.json读取,并且⾃动解析为json数据返回
spark读取⽂件内容时是按⾏处理的,如果需要将⽂件⾥⾯多⾏处理为⼀⾏数据,可以通过设置multiLine=true(默认为false)
3、当zip或者gzip的⽂件没有任何后缀名或者后缀名不对时,那spark就⽆法⾃动读取了,但是此时可以通过类似如下的⽅式来读取
读取到后,⾃⼰在代码中来解析处理读取的⼆进制⽂件数据
// data解析
})
sep:default `,`
encoding:default `UTF-8` decodes the CSV files by the given encoding type
quote:default `"` sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not `null` but an empty string. This behaviour is different from com.databricks.spark.csv
escape:default `\` sets a single character used for escaping quotes inside an already quoted value.
charToEscapeQuoteEscaping:default `escape` or `\0`
comment:default empty string
header:default `false`
enforceSchema:default `true`
inferSchema:(default `false`)
samplingRatio:default is 1.0
ignoreLeadingWhiteSpace:default `false`
ignoreTrailingWhiteSpace:default `false`
nullValue:default empty string
emptyValue:default empty string
nanValue:default `NaN`
positiveInf:default `Inf`
negativeInf:default `-Inf`
dateFormat:default `yyyy-MM-dd`
timestampFormat:default `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]`
maxColumns:default `20480`
maxCharsPerColumn:default `-1`
unescapedQuoteHandling:default `STOP_AT_DELIMITER`
mode:default `PERMISSIVE`
columnNameOfCorruptRecord:default is the value specified in `lumnNameOfCorruptRecord`
multiLine:default `false`
locale:default is `en-US`
lineSep:default covers all `\r`, `\r\n` and `\n`
pathGlobFilter:an optional glob pattern to only include files with paths matching the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>. It does not change the behavior of partition discovery.
modifiedBefore(batch only): an optional timestamp to only include files with modification times occurring before the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
modifiedAfter(batch only):an optional timestamp to only include files with modification times occurring after the specified Time. The provided timestamp must be in the following form: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
recursiveFileLookup: recursively scan a directory for files. Using this option disables partition discovery
Skip to content
Search or jump to…
Pull requests
Issues
Marketplace
Explore
@597365581
apache
/
spark
Public
2.1k
31.2k24.7k
Code
Pull requests
219
Actions
Projects
Security
Insights
spark/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@HyukjinKwon
HyukjinKwon [SPARK-35250][SQL][DOCS] Fix duplicated STOP_AT_DELIMITER to SKIP_VAL…
…
Latest commit 89f5ec7 on May 4
History
72 contributors
@HyukjinKwon@cloud-fan@MaxGekk@rxin@srowen@liancheng@maropu@gatorsmile@viirya@gengliangwang@dongjoon-hyun@yaooqinn 1003 lines (944 sloc) 46.7 KB
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* /licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.util.{Locale, Properties}
llection.JavaConverters._
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.Partition
import org.apache.spark.annotation.Stable
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions, UnivocityParser}
import org.apache.spark.pressions.ExprUtils
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils, FailureSafeParser}
import org.apache.tor.catalog.{CatalogV2Util, SupportsCatalogOptions, SupportsRead}
import org.apache.tor.catalog.TableCapability._
import org.apache.utionmand.DDLUtils
import org.apache.ution.datasources.DataSource
import org.apache.ution.datasources.csv._
import org.apache.ution.datasources.jdbc._
import org.apache.ution.datasources.json.TextInputJsonDataSource
import org.apache.ution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.pes.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.pes.UTF8Stringgzip是什么文件夹
/
**
* Interface used to load a [[Dataset]] from external storage systems (e.g. file systems,
* key-value stores, etc). Use `ad` to access this.
*
* @since 1.4.0
*/
@Stable
class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Specifies the input data source format.
*
* @since 1.4.0
*/
def format(source: String): DataFrameReader = {
this.source = source
this
}
/**
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can
* skip the schema inference step, and thus speed up data loading.
*
* @since 1.4.0
*/
def schema(schema: StructType): DataFrameReader = {
if (schema != null) {
val replaced = CharVarcharUtils.failIfHasCharVarchar(schema).asInstanceOf[StructType]
this.userSpecifiedSchema = Option(replaced)
}
this
}
/**
* Specifies the schema by using the input DDL-formatted string. Some data sources (e.g. JSON) can
* infer the input schema automatically from data. By specifying the schema here, the underlying
* data source can skip the schema inference step, and thus speed up data loading.
*
* {{{
* ad.schema("a INT, b STRING, c DOUBLE").csv("test.csv")
* }}}
*
* @since 2.3.0
*/
def schema(schemaString: String): DataFrameReader = {
schema(StructType.fromDDL(schemaString))
}
/**
* Adds an input option for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID
* to be used to parse timestamps in the JSON/CSV datasources or partition values. The following * formats of `timeZone` are supported:
* <ul>
* <li> Region-based zone ID: It should have the form 'area/city', such as
* 'America/Los_Angeles'.</li>
* <li> Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00'
* or '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.</li>
* </ul>
* Other short names like 'CST' are not recommended to use because they can be ambiguous.
* If it isn't set, the current value of the SQL config `spark.sql.session.timeZone` is
* used by default.
* </li>
* </ul>
*
* @since 1.4.0
*/
def option(key: String, value: String): DataFrameReader = {
this
}
/**
* Adds an input option for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* @since 2.0.0
*/
def option(key: String, value: Boolean): DataFrameReader = option(key, String)
/**
* Adds an input option for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* @since 2.0.0
*/
def option(key: String, value: Long): DataFrameReader = option(key, String)
/**
* Adds an input option for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* @since 2.0.0
*/
def option(key: String, value: Double): DataFrameReader = option(key, String)
/**
* (Scala-specific) Adds input options for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID
* to be used to parse timestamps in the JSON/CSV datasources or partition values. The following * formats of `timeZone` are supported:
* <ul>
* <li> Region-based zone ID: It should have the form 'area/city', such as
* 'America/Los_Angeles'.</li>
* <li> Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00'
* or '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.</li>
* </ul>
* Other short names like 'CST' are not recommended to use because they can be ambiguous.
* If it isn't set, the current value of the SQL config `spark.sql.session.timeZone` is
* used by default.
* </li>
* </ul>
*
* @since 1.4.0
*/
def options(options: llection.Map[String, String]): DataFrameReader = {
this
}
/**
* Adds input options for the underlying data source.
*
* All options are maintained in a case-insensitive way in terms of key names.
* If a new option has the same key case-insensitively, it will override the existing option.
*
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a time zone ID
* to be used to parse timestamps in the JSON/CSV datasources or partition values. The following * formats of `timeZone` are supported:
* <ul>
* <li> Region-based zone ID: It should have the form 'area/city', such as
* 'America/Los_Angeles'.</li>
* <li> Zone offset: It should be in the format '(+|-)HH:mm', for example '-08:00'
* or '+01:00'. Also 'UTC' and 'Z' are supported as aliases of '+00:00'.</li>
* </ul>
* Other short names like 'CST' are not recommended to use because they can be ambiguous.
* If it isn't set, the current value of the SQL config `spark.sql.session.timeZone` is
* used by default.
* </li>
* </ul>
*
* @since 1.4.0
*/
def options(options: java.util.Map[String, String]): DataFrameReader = {
this.options(options.asScala)
this
}
/**
* Loads input in as a `DataFrame`, for data sources that don't require a path (e.g. external
* key-value stores).
*
* @since 1.4.0
*/
def load(): DataFrame = {
pty: _*) // force invocation of `load(...)`
}
/
**
* Loads input in as a `DataFrame`, for data sources that require a path (e.g. data backed by
* a local or distributed file system).
*
* @since 1.4.0
*/
def load(path: String): DataFrame = {
// force invocation of `load(...)`
if (f.legacyPathOptionBehavior) {
option("path", path).pty: _*)
} else {
load(Seq(path): _*)
}
}
/**
* Loads input in as a `DataFrame`, for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
*
* @since 1.6.0
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
if (LowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw new AnalysisException("Hive data source can only be used with tables, you can not " +
"read files of Hive data source directly.")
}
val legacyPathOptionBehavior = f.legacyPathOptionBehavior
if (!legacyPathOptionBehavior &&
(ains("path") || ains("paths")) && Empty) {
throw new AnalysisException("There is a 'path' or 'paths' option set and load() is called " +
"with path parameters. Either remove the path option if it's the same as the path " +
"parameter, or add it to the load() parameter if you do want to read multiple paths. " +
s"To ignore this check, set '${SQLConf.LEGACY_PATH_OPTION_BEHAVIOR.key}' to 'true'.") }
DataSource.lookupDataSourceV2(source, f).map { provider =>
val catalogManager = sparkSession.sessionState.catalogManager
val sessionOptions = actSessionConfigs(
source = provider, conf = f)
val optionsWithPath = if (paths.isEmpty) {
extraOptions
} else if (paths.length == 1) {
extraOptions + ("path" -> paths.head)
} else {
val objectMapper = new ObjectMapper()
extraOptions + ("paths" -> objectMapper.Array))
}
val finalOptions = sessionOptions.filterKeys(!ains(_)).toMap ++
val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
val (table, catalog, ident) = provider match {
case _: SupportsCatalogOptions Empty =>
throw new IllegalArgumentException(
s"$source does not support user specified schema. Please don't specify the schema.")
case hasCatalog: SupportsCatalogOptions =>
val ident = actIdentifier(dsOptions)
val catalog = TableProviderCatalog(
hasCatalog,
catalogManager,
dsOptions)
(catalog.loadTable(ident), Some(catalog), Some(ident))
case _ =>
// TODO: Non-catalog paths for DSV2 are currently not well defined.
val tbl = TableFromProvider(provider, dsOptions, userSpecifiedSchema) (tbl, None, None)
}
import org.apache.ution.datasources.v2.DataSourceV2Implicits._
table match {
case _: SupportsRead if table.supports(BATCH_READ) =>
Dataset.ofRows(
sparkSession,
case _ => loadV1Source(paths: _*)
}
}.getOrElse(loadV1Source(paths: _*))
}
private def loadV1Source(paths: String*) = {
val legacyPathOptionBehavior = f.legacyPathOptionBehavior
val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) {
(Nil, extraOptions + ("path" -> paths.head))
} else {
(paths, extraOptions)
}
// Code path for data source v1.
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = finalPaths,
userSpecifiedSchema = userSpecifiedSchema,
className = source,
options = iginalMap).resolveRelation())
}
/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table and connection properties.
*
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// properties should override settings in extraOptions.
/
/ explicit url and dbtable should override all
}
/**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`.
* @param table Name of the table in the external database.
* @param columnName the name of a column of numeric, date, or timestamp type
* that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride.
* @param upperBound the maximum value of `columnName` used to decide partition stride.
* @param numPartitions the number of partitions. This, along with `lowerBound` (inclusive),
* `upperBound` (exclusive), form partition strides for generated WHERE
* clause expressions used to split the column `columnName` evenly. When
* the input is less than 1, the number is set to 1.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included. "fetchsize" can be used to control the
* number of rows per fetch and "queryTimeout" can be used to wait
* for a Statement object to execute to the given number of seconds.
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
// columnName, lowerBound, upperBound and numPartitions override settings in extraOptions.
JDBCOptions.JDBC_PARTITION_COLUMN -> columnName,
JDBCOptions.JDBC_LOWER_BOUND -> String,
JDBCOptions.JDBC_UPPER_BOUND -> String,
JDBCOptions.JDBC_NUM_PARTITIONS -> String)
jdbc(url, table, connectionProperties)
}
/
**
* Construct a `DataFrame` representing the database table accessible via JDBC URL
* url named table using connection properties. The `predicates` parameter gives a list
* expressions suitable for inclusion in WHERE clauses; each one defines one partition
* of the `DataFrame`.
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param predicates Condition in the where clause for each partition.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included. "fetchsize" can be used to control the
* number of rows per fetch.
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions.
val params = extraOptions ++ connectionProperties.asScala
val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}
/
**
* Loads a JSON file and returns the results as a `DataFrame`.
*
* See the documentation on the overloaded `json()` method with varargs for more details.
*
* @since 1.4.0
*/
def json(path: String): DataFrame = {
// This method ensures that calls that explicit need single argument works, see SPARK-16009
json(Seq(path): _*)
}
/
**
* Loads JSON files and returns the results as a `DataFrame`.
*
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论