spark删除mysql记录_在spark中操作mysql数据----spark学习
之七
使⽤spark的 DataFrame 来操作mysql数据。
DataFrame是⽐RDD更⾼⼀个级别的抽象,可以应⽤SQL语句进⾏操作,详细参考:
这⾥暂时使⽤spark-shell进⾏操作,
1.⾸先,必须要先下载⼀个mysql的jdbc的驱动
可以从这⾥下载
2.然后呢,就好办了。
#具体的启动spark-shell的⽅法(带上mysql的driver)
$~/spark-shell --driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar
#定义mysql的信息
val url="jdbc:mysql://10.181.176.226:3306/geo_info"val prop=new java.util.Properties
prop.setProperty("user","geo")
mysql下载jar包prop.setProperty("password","xxxxxx”)
#指定读取条件,这⾥ Array("country_code='CN'") 是where过滤条件
val cnFlight= ad.jdbc(url,"gps_location",Array("country_code='CN'"),prop)
#然后进⾏groupby 操作,获取数据集合
val emailList= upBy("gps_city", "user_mail”)
#计算数⽬,并根据数⽬进⾏降序排序
val sorted= unt().orderBy( desc("count") )
#显⽰前10条
sorted.show(10)
#存储到⽂件(这⾥会有很多分⽚⽂件。。。)
sorted.rdd.saveAsTextFile("/home/qingpingzhang/data/flight_top”)
#存储到mysql表⾥
sorted.write.jdbc(url,"table_name",prop)
3.具体⽂件编写代码,然后提交worker也类似,主要是DataFrame的 sqlContext声明会不⼀样。
val sc: SparkContext //An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
这⾥如果要⽤spark-submit,则会有坑,即便你是⽤sbt的assembly来打包的⼀个全的jar包:
[itelbog@iteblog ~]$ bin/spark-submit --master local[2] --driver-class-path lib/mysql-connector-java-5.1.35.jar --class spark.SparkToJDBC ./spark-test_2.10-1.0.jar

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。