spark使⽤partition写⼊数据库
object mappartition写⼊数据库 extends App{
val spark=SparkSession.builder().appName("spark test function").master("local[*]").enableHiveSupport().getOrCreate()  val conn = Connection("jdbc:mysql://localhost:3306/mzf_sn?characterEncoding=utf8", "root", "root")  val sc=spark.sparkContext
val a:RDD[Int] = sc.parallelize(1 to 1000000,2 )
val count=a.foreachPartition(v => new CustomIterator3(v))
class CustomIterator3(iter: Iterator[Int]) extends Iterator[Int] {
@transient val conn = Connection(
"jdbc:mysql://localhost:3306/mzf_sn?characterEncoding=utf8",
"root",
"root"
);
println("调⽤分区")
while(iter.hasNext){
val ()
val sql="insert into t_test(id) values ("+String+")"
val stmt = ateStatement
}
override def hasNext: Boolean = {
iter.hasNext
}
override def next():Int={
val ()
1
}
}
session下载
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。