Mysql学习(三)Spark(Scala)写⼊Mysql的两种⽅式
package total
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.pes.{LongType, StringType, StructField, StructType}
object CountProvince {
def main(args: Array[String]): Unit = {
/**
* 第⼀种写⼊Mysql⽅式
*/
/**
* 第⼀步判断参数个数
*/
if (args.length < 2){
println(
"""
|total.CountProvince<inputFilePath><outputFilePath>
|<inputFilePath> 输⼊的⽂件路径
|<outputFilePath> 输出的⽂件路径
""".stripMargin
)
}
/**
* 接收参数
*/
val Array(inputFile,outputFile) = args
val conf = new SparkConf().setAppName(s"${SimpleName}")
// val sc = new SparkContext(conf)
// val input = ("F:\\Scala\\Spark\\第⼆⼗天-dmp项⽬\\资料\\out1")
/**
* 创建SparkSession
*/
val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
/**
* 读取⽂件的数据
*/
val df: DataFrame = ad.parquet(inputFile)
/**
* 创建⼀个临时表
*/
/**
* 统计出来count
*/
val sql =
"""
select
count(*),provincename, cityname
from
countProvince
group by
group by
provincename, cityname
order by
provincename"""
val dff = session.sqlContext.sql(sql)
val url = "jdbc:mysql://192.168.123.102:3306/test"
val properties = new Properties()
properties.put("user","root")
properties.put("password","root")
dff.write.jdbc(url,"count",properties)
/**
* 第⼆种写⼊Mysql⽅式
*/
/
/ val conf = new SparkConf().setMaster("local[2]").setAppName("CountProvince") val sc = new SparkContext(conf)
// val Spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // val sc = Spark.sparkContext
// /**
// * 读取⽂件
// */
// val inputPath = sc.textFile("F:\\Scala\\Spark\\第⼆⼗天-dmp项⽬\\资料\\") // inputPath.foreach(println(_))
// /**
// * 计算count
// */
/
/ val countRDD = inputPath
// .map(line => {
// val fields = line.split(",")
// (fields(24) + ":" + fields(25), 1)
// }).reduceByKey(_ + _)
//
// /**
// * ⽤Row来转换
// */
// val RowRdd: RDD[Row] = countRDD.map(tuple => {
// val diming = tuple._1
/
/ val count = tuple._2
// Row(diming, count)
// })
// /**mysql下载app
// * 再⽤schema
// */
// val schema: StructType = StructType(
// StructField("diming", StringType, true) ::
// StructField("count", LongType, true) :: Nil
// )
// /**
/
/ * 把 Row和 schema 合并成为 DataFrame
// */
// val df: DataFrame = ateDataFrame(RowRdd,schema)
//
// /**
// * 创建⼀个临时表
// */
// df.createOrReplaceTempView("countProvince")
//
// /**
// * 把结果持久化到数据库
/
/ */
// val url = "jdbc:mysql://192.168.123.102:3306/test"
// val properties = new Properties()
// properties.put("user","root")
// properties.put("password","root")
// df.write.jdbc(url,"count2",properties)
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论