sparksql增删改差操作mysql_sparkdataframe处理数据增删改
查
1、配置⽂件
package config
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
case object conf {
private val master = "local[*]"
val confs: SparkConf = new SparkConf().setMaster(master).setAppName("jobs")url是什么格式文件
// val confs: SparkConf = new SparkConf().setMaster("laptop-2up1s8pr:4040/").setAppName("jobs")网页span标签
val sc = new SparkContext(confs)
sc.setLogLevel("ERROR")
val spark_session: SparkSession = SparkSession.builder()
.appName("jobs").config(confs).getOrCreate()
// 设置⽀持笛卡尔积 对于spark2.0来说
f.set("abled",value = true)
}
2、处理脚本
package sparkDataMange
f.{sc, spark_session}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode}
f.spark_session.implicits._
import org.apache.spark.sql.functions._
mysql菜鸟教程增删改查import org.apache.pes.DoubleType
object irisDataFrame {
def main(args: Array[String]): Unit = {
val st: Long = System.currentTimeMillis()
val path:String = "data/iris.data"
var df: DataFrame = ad.csv(path)
/*
* csv ⽂件数据内容如下
*
5.1,3.5,1.4,0.2,Iris-setosa
5.0,2.0,3.5,1.0,Iris-versicolor
6.2,3.4,5.4,2.3,Iris-virginica
* */
/*
+---+---+---+---+-----------+
|_c0|_c1|_c2|_c3| _c4|
+---+---+---+---+-----------+
*/
val ls: Column = when(col("_c4").equalTo("Iris-setosa"), "1").
when(col("_c4").equalTo("Iris-versicolor"), "2").
otherwise("3")
df = df.withColumn("_c4",ls)
df = df.lumns.map(f => df(f).cast(DoubleType)): _*)
df.show()
/*
* 处理结果
*
servlet容器图5.1,3.5,1.4,0.2,1
5.0,2.0,3.5,1.0,2随机摇号器网页版
docker基于什么作为引擎6.2,3.4,5.4,2.3,3
* */
df.printSchema()
spark_session.stop()
println("执⾏时间为:"+ (System.currentTimeMillis()-st)/Double +"s") }
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论