SparkSql批量插⼊或更新保存数据到Pgsql
在sparksql 中,保存数据到数据,只有 Append , Overwrite , ErrorIfExists, Ignore 四种模式,不满⾜项⽬需求 ,现依据 spark save 源码,进⾏进⼀步的改造, 批量保存数据,存在则更新 不存在 则插⼊
**
*测试⽤例
*批量保存数据,存在则更新不存在则插⼊
* INSERT INTO test_001 VALUES(?,?,?)
* ON conflict ( ID ) DO
* UPDATE SET id=?,NAME =?,age =?;
*@author linzhy
*/
object InsertOrUpdateTest {
def main(args: Array[String]): Unit ={
val spark = SparkSession.builder()
.SimpleName)
.master("local[2]")
.config("spark.debug.maxToStringFields","100")
.getOrCreate()
var config = ConfigFactory.load()
val ods_url = String("pg.oucloud_ods.url")
val ods_user = String("pg.oucloud_ods.user")
val ods_password = String("pg.oucloud_ods.password")
val test_001 = ad.format("jdbc")
.
option("url", ods_url)
.option("dbtable","test_001")
.option("user", ods_user)
.option("password", ods_password)
.load()
ateOrReplaceTempView("test_001")
val sql=
"""
|SELECT * FROM test_001
|""".stripMargin
val dataFrame = spark.sql(sql)
/
/批量保存数据,存在则更新不存在则插⼊
PgSqlUtil.insertOrUpdateToPgsql(dataFrame,spark.sparkContext,"test_001_copy1","id")
spark.stop();
}
}
insertOrUpdateToPgsql ⽅法源码
/**
* 批量插⼊或更新数据 ,该⽅法借鉴Spark.write.save() 源码
* @param dataFrame
* @param sc
* @param table
* @param id
*/
def insertOrUpdateToPgsql(dataFrame:DataFrame,sc:SparkContext,table:String,id:String): Unit ={
val tableSchema = dataFrame.schema
val columns =tableSchema.fields.map(x => x.name).mkString(",")
val placeholders = tableSchema.fields.map(_ =>"?").mkString(",")
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set "
val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders) on conflict($id) do update set " val update = tableSchema.fields.map(x =>
String +"=?"
).mkString(",")
val realsql =at(update)
val conn =connectionPool()
conn.setAutoCommit(false)
val dialect = (URL)
val broad_ps = sc.broadcast(conn.prepareStatement(realsql))
val numFields = tableSchema.fields.length *2
//调⽤spark中⾃带的函数或者捞出来,获取属性字段与字段类型
val nullTypes = tableSchema.fields.map(f =>getJdbcType(f.dataType, dialect).jdbcNullType)
val setters = tableSchema.fields.map(f =>makeSetter(conn,f.dataType))
var rowCount =0
val batchSize =2000
val updateindex = numFields /2
try{
dataFrame.foreachPartition(iterator =>{
//遍历批量提交
val ps = broad_ps.value
try{
while(iterator.hasNext){
val row = ()
var i =0
while(i < numFields){
i < updateindex match {
case true=>{
if(row.isNullAt(i)){
ps.setNull(i +1,nullTypes(i))
}else{
setters(i).apply(ps, row, i,0)
}
}
case false=>{
if(row.isNullAt(i-updateindex)){
ps.setNull(i +1,nullTypes(i-updateindex))
}else{
setters(i-updateindex).apply(ps, row, i,updateindex)
}
}
}
i = i +1
}
ps.addBatch()
rowCount +=1
if(rowCount % batchSize ==0){
rowCount =0
}
批量更新sql语句}
if(rowCount >0){
}
}finally{
ps.close()
}
})
connmit()
}catch{
case e: Exception =>
logError("Error in execution of insert. "+ e.getMessage)
// insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",e.getMessage)
// insertError(connectionPool("OuCloud_ODS"),"insertOrUpdateToPgsql",e.getMessage) }finally{
conn.close()
}
}
从spark 源码中捞出 getJdbcType /makeSetter函数
private def getJdbcType(dt: DataType, dialect: JdbcDialect): JdbcType ={
throw new IllegalArgumentException(s"Can't get JDBC type for ${dt.catalogString}"))
}
private type JDBCValueSetter_add =(PreparedStatement, Row, Int,Int)=> Unit
private def makeSetter(conn: Connection, dataType: DataType): JDBCValueSetter_add = dataType match { case IntegerType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setInt(pos +1, Int(pos-currentpos))
case LongType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setLong(pos +1, Long(pos-currentpos))
case DoubleType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setDouble(pos +1, Double(pos-currentpos))
case FloatType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setFloat(pos +1, Float(pos-currentpos))
case ShortType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setInt(pos +1, Short(pos-currentpos))
case ByteType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setInt(pos +1, Byte(pos-currentpos))
case BooleanType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setBoolean(pos +1, Boolean(pos-currentpos))
case StringType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setString(pos +1, String(pos-currentpos))
case BinaryType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setBytes(pos +1, As[Array[Byte]](pos-currentpos))
case TimestampType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setTimestamp(pos +1, As[java.sql.Timestamp](pos-currentpos))
case DateType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setDate(pos +1, As[java.sql.Date](pos-currentpos))
case t: DecimalType =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
stmt.setBigDecimal(pos +1, Decimal(pos-currentpos))
case _ =>
(stmt: PreparedStatement, row: Row, pos: Int,currentpos:Int)=>
throw new IllegalArgumentException(
s"Can't translate non-null value for field $pos")
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论