3.Spark常见数据源
*以下内容由《Spark快速⼤数据分析》整理所得。
读书笔记的第三部分是讲的是Spark有哪些常见数据源?怎么读取它们的数据并保存。
Spark有三类常见的数据源:
⽂件格式与⽂件系统:它们是存储在本地⽂件系统或分布式⽂件系统(⽐如 NFS、HDFS、Amazon S3 等)中的数据,例如:⽂本⽂件、JSON、SequenceFile,以及 protocol buffer。
Spark SQL中的结构化数据源:它针对包括JSON和Apache Hive在内的结构化数据源。
数据库与键值存储:Spark ⾃带的库和⼀些第三⽅库,它们可以⽤来连接Cassandra、HBase、Elasticsearch以及JDBC源。
⼀、⽂件格式与⽂件系统
1. ⽂本⽂件
2. JSON
3. CSV
4. SequenceFile
⼆、Spark SQL中的结构化数据源
1. Hive
2. JSON
三、数据库与键值存储
⼀、⽂件格式与⽂件系统
1. ⽂本⽂件
⽂本⽂件读取:
# ⽅法1:⽂本⽂件读取
input = sc.textFile("file://home/holden/repos/sparks/README.md")
# ⽅法2:如果⽂件⾜够⼩,同时读取整个⽂件,从⽽返回⼀个pair RDD,其中键时输⼊⽂件的⽂件名
input = sc.wholeTextFiles("file://home/holden/salesFiles")
⽂本⽂件保存:
result.saveAsTextFile(outputFile)
2. JSON
JSON读取:
# JSON读取
import json
data = input.map(lambda x: json.loads(x))
JSON保存:
# JSON保存 - 举例选出喜爱熊猫的⼈
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)) # 保存⽂本⽂件
result.SaveAsTextFile(outputFilePath)
3. CSV
CSV读取:
import csv
import StringIO
# CSV读取 - 如果数据字段均没有包括换⾏符,只能⼀⾏⾏读取
def loadRecord(line):
"""解析⼀⾏CSV记录"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
()
input = sc.textFile(inputFile).map(loadRecord)
# CSV读取 - 如果数据字段嵌有换⾏符,需要完整读⼊每个⽂件
def loadRecords(fileNameContents):
"""读取给定⽂件中的所有记录"""
input = StringIO.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
CSV保存:
# CSV保存
def writeRecords(records):
"""写出⼀些CSV记录"""
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["names", "favoriteAnimal"])
for record in records:
writer.writerow(record)
return [value()]
python怎么读取json文件pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
4. SequenceFile
SequenceFile读取:
# sc.sequenceFile(path, keyClass, valueClass)
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable") SequenceFile保存(⽤Scala):
val data = sc.parallelize(List(("Pandas", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
⼆、Spark SQL中的结构化数据源
⽤Spark SQL从多种数据源⾥读取数据:
1. Hive
⽤Spark SQL连接已有的Hive:
(1.1)需要将l⽂件复制到 Spark 的 ./conf/ ⽬录下;
(1.2)再创建出HiveContext对象,也就是 Spark SQL 的⼊⼝;
(1.3)使⽤Hive查询语⾔(HQL)来对你的表进⾏查询。
# 例⼦:⽤Python创建HiveContext并查询数据
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print firstRow.name
2. JSON
(2.1)和使⽤Hive⼀样创建⼀个HiveContext。(不过在这种情况下我们不需要安装好Hive,也就是说你也不需要l⽂件。);(2.2)使⽤HiveContext.jsonFile⽅法来从整个⽂件中获取由Row对象组成的RDD。
(2.3)除了使⽤整个Row对象,你也可以将RDD注册为⼀张表,然后从中选出特定的字段。
# 例⼦:在Python中使⽤Spark SQL读取JSON数据
tweets = hiveCtx.jsonFile("tweets.json")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
三、数据库与键值存储
关于Cassandra、HBase、Elasticsearch以及JDBC源的数据库连接,详情请参考书本81-86页内容。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论