spark百万写⼊mysql数据量⼤_第⼋篇SparkSQL百万级数据批
量读写⼊MySQL
Spark SQL读取MySQL的⽅式
Spark SQL还包括⼀个可以使⽤JDBC从其他数据库读取数据的数据源。与使⽤JdbcRDD相⽐,应优先使⽤此功能。这是因为结果作为DataFrame返回,它们可以在Spark SQL中轻松处理或与其他数据源连接。JDBC数据源也更易于使⽤Java或Python,因为它不需要⽤户提供ClassTag。
可以使⽤Data Sources API将远程数据库中的表加载为DataFrame或Spark SQL临时视图。⽤户可以在数据源选项中指定JDBC连接属性。user和password通常作为⽤于登录数据源的连接属性。除连接属性外,Spark还⽀持以下不区分⼤⼩写的选项:
属性名称
解释
url
要连接的JDBC URL
dbtable
读取或写⼊的JDBC表
query
指定查询语句
driver
⽤于连接到该URL的JDBC驱动类名
partitionColumn, lowerBound, upperBound
如果指定了这些选项,则必须全部指定。另外, numPartitions必须指定
numPartitions
表读写中可⽤于并⾏处理的最⼤分区数。这也确定了并发JDBC连接的最⼤数量。如果要写⼊的分区数超过此限制,我们可以通过
coalesce(numPartitions)在写⼊之前进⾏调⽤将其降低到此限制
queryTimeout
默认为0,查询超时时间
fetchsize
JDBC的获取⼤⼩,它确定每次要获取多少⾏。这可以帮助提⾼JDBC驱动程序的性能
batchsize
默认为1000,JDBC批处理⼤⼩,这可以帮助提⾼JDBC驱动程序的性能。
isolationLevel
mysql怎么读英语事务隔离级别,适⽤于当前连接。它可以是⼀个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。此选项仅适⽤于写作。
sessionInitStatement
在向远程数据库打开每个数据库会话之后,在开始读取数据之前,此选项将执⾏⾃定义SQL语句,使
⽤它来实现会话初始化代码。
truncate
这是与JDBC writer相关的选项。当SaveMode.Overwrite启⽤时,就会清空⽬标表的内容,⽽不是删除和重建其现有的表。默认为false pushDownPredicate
⽤于启⽤或禁⽤谓词下推到JDBC数据源的选项。默认值为true,在这种情况下,Spark将尽可能将过滤器下推到JDBC数据源。
源码
SparkSession
/**
* Returns a [[DataFrameReader]] that can be used to read non-streaming data in as a
* `DataFrame`.
* {{{
* ad.parquet("/path/to/file.parquet")
* ad.schema(schema).json("/path/to/file.json")
* }}}
*
* @since 2.0.0
*/
def read: DataFrameReader = new DataFrameReader(self)
DataFrameReader
// ...省略代码...
/**
*所有的数据由RDD的⼀个分区处理,如果你这个表很⼤,很可能会出现OOM
bind函数要求的地址类型是*可以使⽤DataFrameDF.rdd.partitions.size⽅法查看
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
format("jdbc").load()
}
/**
* @param url 数据库url
* @param table 表名
* @param columnName 分区字段名
* @param lowerBound `columnName`的最⼩值,⽤于分区步长
* @param upperBound `columnName`的最⼤值,⽤于分区步长.二次函数的解析式怎么求
* @param numPartitions 分区数量
* @param connectionProperties 其他参数
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
JDBCOptions.JDBC_PARTITION_COLUMN -> columnName, JDBCOptions.JDBC_LOWER_BOUND -> String, JDBCOptions.JDBC_UPPER_BOUND -> String, JDBCOptions.JDBC_NUM_PARTITIONS -> String)
jdbc(url, table, connectionProperties)
}
/**
* @param predicates 每个分区的where条件
* ⽐如:"id <= 1000", "score > 1000 and score <= 2000"
* 将会分成两个分区
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = { assertNoSpecifiedSchema("jdbc")
val params = Map ++ Map val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
val relation = JDBCRelation(parts, options)(sparkSession)
sparkSession.baseRelationToDataFrame(relation)
}
⽰例
private def runJdbcDatasetExample(spark: SparkSession): Unit = {
// 从JDBC source加载数据(load)
val jdbcDF = ad
.format("jdbc")
.
option("url", "jdbc:mysql://127.0.0.1:3306/test")
.option("dbtable", "mytable")
.option("user", "root")
.option("password", "root")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "root")
val jdbcDF2 = ad
.jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
// 指定读取schema的数据类型
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = ad
.jdbc("jdbc:mysql://127.0.0.1:3306/test", "mytable", connectionProperties)
}
值得注意的是,上⾯的⽅式如果不指定分区的话,Spark默认会使⽤⼀个分区读取数据,这样在数据量特别⼤的情况下,会出现OOM。在读取数据之后,调⽤DataFrameDF.rdd.partitions.size⽅法可以查看分区数。
Spark SQL批量写⼊MySQL
代码⽰例如下:
object BatchInsertMySQL{
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
通用对话框commondialog1// 创建sparkSession对象
val conf = new SparkConf()
.setAppName("BatchInsertMySQL")
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
import spark.implicits._
// MySQL连接参数
val url = JDBCUtils.url
val user = JDBCUtils.user
val pwd = JDBCUtils.password
/
/ 创建Properties对象,设置连接mysql的⽤户名和密码
val properties: Properties = new Properties()
properties.setProperty("user", user) // ⽤户名
properties.setProperty("password", pwd) // 密码
properties.setProperty("driver", "sql.jdbc.Driver") properties.setProperty("numPartitions","10")
// 读取mysql中的表数据
val testDF: DataFrame = ad.jdbc(url, "test", properties) println("testDF的分区数: " + testDF.rdd.partitions.size)
testDF.persist(StorageLevel.MEMORY_AND_DISK)javascript该怎么学
testDF.printSchema()
val result =
s"""-- SQL代码
""".stripMargin
val resultBatch = spark.sql(result).as[Person]
println("resultBatch的分区数: " + resultBatch.rdd.partitions.size) // 批量写⼊MySQL
ios直接从网页下载视频// 此处最好对处理的结果进⾏⼀次重分区
// 由于数据量特别⼤,会造成每个分区数据特别多
val list = new ListBuffer[Person]
record.foreach(person => {
val name = Person.name
val age = Person.age
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论