⽤IDEAjava编写的spark读取csv⽂件代码使⽤IDEA2019.3.3版本,jdk1.8 创建maven项⽬
直接上⼲粮:
1. 配置 l
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"
xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId&le</groupId>
<artifactId>JavaDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.3.1</spark.version>
<scala.version>2.11</scala.version>
<!-- 配置以下可以解决在jdk1.8环境下打包时报错 “-source 1.5 中不⽀持 lambda 表达式” -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<mavenpiler.source>1.8</mavenpiler.source>
<mavenpiler.target>1.8</mavenpiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib-local_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- SparkStreaming + Kafka -->
<dependency>
<groupId>org.apache.spark</groupId>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>            <version>2.3.1</version>
</dependency>
<!-- 向kafka ⽣产数据需要包 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<!--连接 Redis 需要的包-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId&le.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
------------------------------------------------------------------------java 代码:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
java怎么编写
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
import java.util.HashMap;
public class SparkCsvDemo {
public static void main(String[] args) {
String hdfsInAddress = "D:\\AWS\\";//"hdfs://192.168.209.129:9000/";
String inputAddress = "";//
String csvFileName="emr-demo-data.csv";
SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark");
System.out.println("==================");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
HashMap<String,String> options = new HashMap<String,String> ();
options.put("header", "true");
//options.put("path", hdfsInAddress + inputAddress + filePath);
options.put("path", hdfsInAddress + inputAddress + csvFileName);
System.out.println("打印上传⽂件在hdfs的路径:"+hdfsInAddress + inputAddress + csvFileName);
Dataset dataFrame = sqlContext.load("com.databricks.spark.csv", options);
// DataFrame cars = (new CsvParser()).withUseHeader(true).csvFile(sqlContext, "cars.csv");//通过CsvParser⾥⾯的函数来读取CSV⽂件        isterTempTable("prodinst");
//DistributorTier 是csv⽂件中的列头名,列中值含有'T1'
//csv 必需是UTF8编码格式的,具体怎么转编码格式请百度
Dataset resultFrame=sqlContext.sql("select * from prodinst where DistributorTier= 'T1' ");
System.out.println("***************⽤Dataset打印*peopleScore********"+resultFrame.limit(10).showString(20,0,false));
sc.stop();
}
}
------------------------------------------------------------------------运⾏可打印出csv结果-----------------------

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。