Description
5.1 动机
Spark 支持很多种输入输出源。一部分原因是 Spark 本身是基于 Hadoop 生态圈而构建,特别是 Spark 可以通过 Hadoop MapReduce 所使用的 InputFormat 和 OutputFormat 接口访问数据,而大部分常见的文件格式与存储系统(例如 S3、HDFS、Cassandra、HBase 等)都支持这种接口。
-
文件格式与文件系统
对于存储在本地文件系统或分布式文件系统(比如 NFS、HDFS、Amazon S3 等)中的 数据,Spark 可以访问很多种不同的文件格式,包括文本文件、JSON、SequenceFile, 以及 protocol buffer。我们会展示几种常见格式的用法,以及 Spark 针对不同文件系统 的配置和压缩选项。
-
Spark SQL中的结构化数据源
第 9 章会介绍 Spark SQL 模块,它针对包括 JSON 和 Apache Hive 在内的结构化数据 源,为我们提供了一套更加简洁高效的 API。此处会粗略地介绍一下如何使用 Spark SQL,而大部分细节将留到第 9 章讲解。
-
数据库与键值存储
本章还会概述 Spark 自带的库和一些第三方库,它们可以用来连接 Cassandra、HBase、 Elasticsearch 以及 JDBC 源。
5.2 文件格式
表5-1: Spark支持的一些常见格式
格式名称 | 结构化 | 备注 |
---|---|---|
文本文件 | 否 | 普通的文本文件,每行一条记录 |
JSON | 半结构化 | 常见的基于文本的格式,半结构化;大多数库都要求每行一条记录 |
CSV | 是 | 非常常见的基于文本的格式,通常在电子表格应用中使用 |
SequenceFiles | 是 | 一种用于键值对数据的常见 Hadoop 文件格式 |
Protocol buffers | 是 | 一种快速、节约空间的跨语言格式 |
对象文件 | 是 | 用来将 Spark 作业中的数据存储下来以让共享的代码读取。改变类的时候 它会失效,因为它依赖于 Java 序列化 |
5.2.1 文本文件
- 读取文本文件
python:
input = sc.textFile("file:///home/holden/repos/spark/README.md")
scala:
val input = sc.textFile("file:///home/holden/repos/spark/README.md")
java:
JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
希望同时处理整个文件,使用 SparkContext. wholeTextFiles() 方法 。Spark 支持读取给定目录中的所有文件,以及在输入路径中使用通配字符 (如 part-*.txt)
- 保存文本文件
result.saveAsTextFile(outputFile)
5.2.2 JSON
- 读取JSON
python:
import json
data = input.map(lambda x: json.loads(x))
- 保存JSON
写出 JSON 文件比读取它要简单得多,因为不需要考虑格式错误的数据,并且也知道要写 出的数据的类型。可以使用之前将字符串 RDD 转为解析好的 JSON 数据的库,将由结构 化数据组成的 RDD 转为字符串 RDD,然后使用 Spark 的文本文件 API 写出去。
python:
(data.filter(lambda x: x["lovesPandas"]).map(lambda x: json.dumps(x))
.saveAsTextFile(outputFile))
scala:
result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
.saveAsTextFile(outputFile)
5.2.3 CSV和TSV
- 读取CSV
读取 CSV/TSV 数据和读取 JSON 数据相似,都需要先把文件当作普通文本文件来读取数据,再对数据进行处理。由于格式标准的缺失,同一个库的不同版本有时也会用不同的方式处理输入数据。
python:
import csv
import StringIO
def loadRecord(line):
"""解析一行CSV记录"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
scala:
import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
在 Python 中完整读取 CSV :
def loadRecords(fileNameContents):
"""读取给定文件中的所有记录"""
input = StringIO.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
在 Scala 中完整读取 CSV:
case class Person(name: String, favoriteAnimal: String)
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll().map(x => Person(x(0), x(1))) }
- 保存CSV
python:
def writeRecords(records):
"""写出一些CSV记录"""
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
for record in records:
writer.writerow(record) return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
scala:
pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)
5.2.4 SequenceFile
Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile(path, keyClass, valueClass, minPartitions)。前面提到过,SequenceFile 使用 Writable 类,因 此 keyClass 和 valueClass 参数都必须使用正确的 Writable 类。
- 读取SequenceFile
python:
data = sc.sequenceFile(inFile, "org.apache.hadoop.io.Text", "org.apache.hadoop.io.IntWritable")
scala:
val data = sc.sequenceFile(inFile, classOf[Text], classOf[IntWritable]).
map{case (x, y) => (x.toString, y.get())}
- 保存SequenceFile
scala:
val data = sc.parallelize(List(("Panda", 3), ("Kay", 6), ("Snail", 2)))
data.saveAsSequenceFile(outputFile)
5.2.5 对象文件
5.2.6 Hadoop输入输出格式
- 读取其他Hadoop输入格式
scala:
新式API
val input = sc.newAPIHadoopFile(inputFile, classOf[LzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
// "输入"中的每个MapWritable代表一个JSON对象
- 保存Hadoop输出格式
saveAsNewAPIHadoopFile
5.2.7 文件压缩
5.3 文件系统
5.3.1 本地/“常规”文件系统
Spark 支持从本地文件系统中读取文件,不过它要求文件在集群中所有节点的相同路径下都可以找到。
5.3.2 Amazon S3
5.3.3 HDFS
在 Spark 中使用 HDFS 只需要将输入输出路径指定为 hdfs://master:port/path 就够了。 需要考虑Hadoop和spark的版本。
5.4 Spark SQL中的结构化数据
5.4.1 Apache Hive
Apache Hive 是 Hadoop 上的一种常见的结构化数据源。Hive 可以在 HDFS 内或者在其他存储系统上存储多种格式的表。这些格式从普通文本到列式存储格式,应有尽有。Spark SQL 可以读取 Hive 支持的任何表。
要把 Spark SQL 连接到已有的 Hive 上,你需要提供 Hive 的配置文件。你需要将 hive-site.xml 文件复制到 Spark 的 ./conf/ 目录下。这样做好之后,再创建出 HiveContext 对象,也就是 Spark SQL 的入口,然后你就可以使用Hive 查询语言(HQL)来对你的表进行查询, 并以由行组成的 RDD 的形式拿到返回数据。
python:
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("SELECT name, age FROM users")
firstRow = rows.first()
print(firstRow.name)
scala:
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new org.apache.spark.sql.hive.HiveContext(sc)
val rows = hiveCtx.sql("SELECT name, age FROM users")
val firstRow = rows.first()
println(firstRow.getString(0)) // 字段0是name字段
5.4.2 JSON
json推文:
{"user": {"name": "Holden", "location": "San Francisco"}, "text": "Nice day out today"}
{"user": {"name": "Matei", "location": "Berkeley"}, "text": "Even nicer here :)"}
python:
tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
results = hiveCtx.sql("SELECT user.name, text FROM tweets")
scala:
val tweets = hiveCtx.jsonFile("tweets.json")
tweets.registerTempTable("tweets")
val results = hiveCtx.sql("SELECT user.name, text FROM tweets")
5.5 数据库
通过数据库提供的 Hadoop 连接器或者自定义的 Spark 连接器,Spark 可以访问一些常用的数据库系统。
5.5.1 Java数据库连接
5.5.2 Cassandra
5.5.3 HBase
5.5.4 Elasticsearch
Spark 可以使用 Elasticsearch-Hadoop从 Elasticsearch
中读写数据。Elasticsearch 是一个开源的、基于 Lucene 的搜索系统。