Skip to content

Commit deabe06

Browse files
chenghao-intelconviva-zz
authored andcommitted
[SPARK-2523] [SQL] Hadoop table scan bug fixing
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions. This is the follow up with: apache#1408 apache#1390 I've run a micro benchmark in my local with 15000000 records totally, and got the result as below: With This Patch | Partition-Based Table | Non-Partition-Based Table ------------ | ------------- | ------------- No | 1927 ms | 1885 ms Yes | 1541 ms | 1524 ms It showed this patch will also improve the performance. PS: the benchmark code is also attached. (thanks liancheng ) ``` package org.apache.spark.sql.hive import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql._ object HiveTableScanPrepare extends App { case class Record(key: String, value: String) val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i"))) import hiveContext._ hql("SHOW TABLES") hql("DROP TABLE if exists part_scan_test") hql("DROP TABLE if exists scan_test") hql("DROP TABLE if exists records") rdd.registerAsTable("records") hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) hql("""CREATE TABLE scan_test (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) for (part1 <- 2000 until 2001) { for (part2 <- 1 to 5) { hql(s"""from records | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2') | select key, value """.stripMargin) hql(s"""from records | insert into table scan_test select key, value """.stripMargin) } } } object HiveTableScanTest extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ hql("SHOW TABLES") val part_scan_test = hql("select key, value from part_scan_test") val scan_test = hql("select key, value from scan_test") val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test)) val r_scan_test = (0 to 5).map(i => benchmark(scan_test)) println("Scanning Partition-Based Table") r_part_scan_test.foreach(printResult) println("Scanning Non-Partition-Based Table") r_scan_test.foreach(printResult) def printResult(result: (Long, Long)) { println(s"Duration: ${result._1} ms Result: ${result._2}") } def benchmark(srdd: SchemaRDD) = { val begin = System.currentTimeMillis() val result = srdd.count() val end = System.currentTimeMillis() ((end - begin), result) } } ``` Author: Cheng Hao <[email protected]> Closes apache#1439 from chenghao-intel/hadoop_table_scan and squashes the following commits: 888968f [Cheng Hao] Fix issues in code style 27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs 40a24a7 [Cheng Hao] Add Unit Test
1 parent 8d0466d commit deabe06

File tree

4 files changed

+138
-115
lines changed

4 files changed

+138
-115
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 81 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,25 @@ import org.apache.hadoop.hive.ql.exec.Utilities
2424
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
2525
import org.apache.hadoop.hive.ql.plan.TableDesc
2626
import org.apache.hadoop.hive.serde2.Deserializer
27+
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
28+
2729
import org.apache.hadoop.io.Writable
2830
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
2931

3032
import org.apache.spark.SerializableWritable
3133
import org.apache.spark.broadcast.Broadcast
3234
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
3335

36+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast}
37+
import org.apache.spark.sql.catalyst.types.DataType
38+
3439
/**
3540
* A trait for subclasses that handle table scans.
3641
*/
3742
private[hive] sealed trait TableReader {
38-
def makeRDDForTable(hiveTable: HiveTable): RDD[_]
43+
def makeRDDForTable(hiveTable: HiveTable): RDD[Row]
3944

40-
def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_]
45+
def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row]
4146
}
4247

4348

@@ -46,7 +51,10 @@ private[hive] sealed trait TableReader {
4651
* data warehouse directory.
4752
*/
4853
private[hive]
49-
class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveContext)
54+
class HadoopTableReader(
55+
@transient attributes: Seq[Attribute],
56+
@transient relation: MetastoreRelation,
57+
@transient sc: HiveContext)
5058
extends TableReader {
5159

5260
// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
@@ -63,10 +71,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
6371

6472
def hiveConf = _broadcastedHiveConf.value.value
6573

66-
override def makeRDDForTable(hiveTable: HiveTable): RDD[_] =
74+
override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
6775
makeRDDForTable(
6876
hiveTable,
69-
_tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]],
77+
relation.tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]],
7078
filterOpt = None)
7179

7280
/**
@@ -81,14 +89,14 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
8189
def makeRDDForTable(
8290
hiveTable: HiveTable,
8391
deserializerClass: Class[_ <: Deserializer],
84-
filterOpt: Option[PathFilter]): RDD[_] = {
92+
filterOpt: Option[PathFilter]): RDD[Row] = {
8593

8694
assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table,
8795
since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""")
8896

8997
// Create local references to member variables, so that the entire `this` object won't be
9098
// serialized in the closure below.
91-
val tableDesc = _tableDesc
99+
val tableDesc = relation.tableDesc
92100
val broadcastedHiveConf = _broadcastedHiveConf
93101

94102
val tablePath = hiveTable.getPath
@@ -99,23 +107,20 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
99107
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
100108
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
101109

110+
val attrsWithIndex = attributes.zipWithIndex
111+
val mutableRow = new GenericMutableRow(attrsWithIndex.length)
102112
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
103113
val hconf = broadcastedHiveConf.value.value
104114
val deserializer = deserializerClass.newInstance()
105115
deserializer.initialize(hconf, tableDesc.getProperties)
106116

107-
// Deserialize each Writable to get the row value.
108-
iter.map {
109-
case v: Writable => deserializer.deserialize(v)
110-
case value =>
111-
sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}")
112-
}
117+
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
113118
}
114119

115120
deserializedHadoopRDD
116121
}
117122

118-
override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] = {
123+
override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = {
119124
val partitionToDeserializer = partitions.map(part =>
120125
(part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap
121126
makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None)
@@ -132,9 +137,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
132137
* subdirectory of each partition being read. If None, then all files are accepted.
133138
*/
134139
def makeRDDForPartitionedTable(
135-
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
136-
filterOpt: Option[PathFilter]): RDD[_] = {
137-
140+
partitionToDeserializer: Map[HivePartition,
141+
Class[_ <: Deserializer]],
142+
filterOpt: Option[PathFilter]): RDD[Row] = {
138143
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
139144
val partDesc = Utilities.getPartitionDesc(partition)
140145
val partPath = partition.getPartitionPath
@@ -156,33 +161,42 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
156161
}
157162

158163
// Create local references so that the outer object isn't serialized.
159-
val tableDesc = _tableDesc
164+
val tableDesc = relation.tableDesc
160165
val broadcastedHiveConf = _broadcastedHiveConf
161166
val localDeserializer = partDeserializer
167+
val mutableRow = new GenericMutableRow(attributes.length)
168+
169+
// split the attributes (output schema) into 2 categories:
170+
// (partition keys, ordinal), (normal attributes, ordinal), the ordinal mean the
171+
// index of the attribute in the output Row.
172+
val (partitionKeys, attrs) = attributes.zipWithIndex.partition(attr => {
173+
relation.partitionKeys.indexOf(attr._1) >= 0
174+
})
175+
176+
def fillPartitionKeys(parts: Array[String], row: GenericMutableRow) = {
177+
partitionKeys.foreach { case (attr, ordinal) =>
178+
// get partition key ordinal for a given attribute
179+
val partOridinal = relation.partitionKeys.indexOf(attr)
180+
row(ordinal) = Cast(Literal(parts(partOridinal)), attr.dataType).eval(null)
181+
}
182+
}
183+
// fill the partition key for the given MutableRow Object
184+
fillPartitionKeys(partValues, mutableRow)
162185

163186
val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
164187
hivePartitionRDD.mapPartitions { iter =>
165188
val hconf = broadcastedHiveConf.value.value
166-
val rowWithPartArr = new Array[Object](2)
167-
168-
// The update and deserializer initialization are intentionally
169-
// kept out of the below iter.map loop to save performance.
170-
rowWithPartArr.update(1, partValues)
171189
val deserializer = localDeserializer.newInstance()
172190
deserializer.initialize(hconf, partProps)
173191

174-
// Map each tuple to a row object
175-
iter.map { value =>
176-
val deserializedRow = deserializer.deserialize(value)
177-
rowWithPartArr.update(0, deserializedRow)
178-
rowWithPartArr.asInstanceOf[Object]
179-
}
192+
// fill the non partition key attributes
193+
HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow)
180194
}
181195
}.toSeq
182196

183197
// Even if we don't use any partitions, we still need an empty RDD
184198
if (hivePartitionRDDs.size == 0) {
185-
new EmptyRDD[Object](sc.sparkContext)
199+
new EmptyRDD[Row](sc.sparkContext)
186200
} else {
187201
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
188202
}
@@ -225,10 +239,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
225239
// Only take the value (skip the key) because Hive works only with values.
226240
rdd.map(_._2)
227241
}
228-
229242
}
230243

231-
private[hive] object HadoopTableReader {
244+
private[hive] object HadoopTableReader extends HiveInspectors {
232245
/**
233246
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
234247
* instantiate a HadoopRDD.
@@ -241,4 +254,40 @@ private[hive] object HadoopTableReader {
241254
val bufferSize = System.getProperty("spark.buffer.size", "65536")
242255
jobConf.set("io.file.buffer.size", bufferSize)
243256
}
257+
258+
/**
259+
* Transform the raw data(Writable object) into the Row object for an iterable input
260+
* @param iter Iterable input which represented as Writable object
261+
* @param deserializer Deserializer associated with the input writable object
262+
* @param attrs Represents the row attribute names and its zero-based position in the MutableRow
263+
* @param row reusable MutableRow object
264+
*
265+
* @return Iterable Row object that transformed from the given iterable input.
266+
*/
267+
def fillObject(
268+
iter: Iterator[Writable],
269+
deserializer: Deserializer,
270+
attrs: Seq[(Attribute, Int)],
271+
row: GenericMutableRow): Iterator[Row] = {
272+
val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
273+
// get the field references according to the attributes(output of the reader) required
274+
val fieldRefs = attrs.map { case (attr, idx) => (soi.getStructFieldRef(attr.name), idx) }
275+
276+
// Map each tuple to a row object
277+
iter.map { value =>
278+
val raw = deserializer.deserialize(value)
279+
var idx = 0;
280+
while (idx < fieldRefs.length) {
281+
val fieldRef = fieldRefs(idx)._1
282+
val fieldIdx = fieldRefs(idx)._2
283+
val fieldValue = soi.getStructFieldData(raw, fieldRef)
284+
285+
row(fieldIdx) = unwrapData(fieldValue, fieldRef.getFieldObjectInspector())
286+
287+
idx += 1
288+
}
289+
290+
row: Row
291+
}
292+
}
244293
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala

Lines changed: 7 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions._
3434
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
3535
import org.apache.spark.sql.execution._
3636
import org.apache.spark.sql.hive._
37-
import org.apache.spark.util.MutablePair
3837

3938
/**
4039
* :: DeveloperApi ::
@@ -50,8 +49,7 @@ case class HiveTableScan(
5049
relation: MetastoreRelation,
5150
partitionPruningPred: Option[Expression])(
5251
@transient val context: HiveContext)
53-
extends LeafNode
54-
with HiveInspectors {
52+
extends LeafNode {
5553

5654
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
5755
"Partition pruning predicates only supported for partitioned tables.")
@@ -67,42 +65,7 @@ case class HiveTableScan(
6765
}
6866

6967
@transient
70-
private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
71-
72-
/**
73-
* The hive object inspector for this table, which can be used to extract values from the
74-
* serialized row representation.
75-
*/
76-
@transient
77-
private[this] lazy val objectInspector =
78-
relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
79-
80-
/**
81-
* Functions that extract the requested attributes from the hive output. Partitioned values are
82-
* casted from string to its declared data type.
83-
*/
84-
@transient
85-
protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
86-
attributes.map { a =>
87-
val ordinal = relation.partitionKeys.indexOf(a)
88-
if (ordinal >= 0) {
89-
val dataType = relation.partitionKeys(ordinal).dataType
90-
(_: Any, partitionKeys: Array[String]) => {
91-
castFromString(partitionKeys(ordinal), dataType)
92-
}
93-
} else {
94-
val ref = objectInspector.getAllStructFieldRefs
95-
.find(_.getFieldName == a.name)
96-
.getOrElse(sys.error(s"Can't find attribute $a"))
97-
val fieldObjectInspector = ref.getFieldObjectInspector
98-
99-
(row: Any, _: Array[String]) => {
100-
val data = objectInspector.getStructFieldData(row, ref)
101-
unwrapData(data, fieldObjectInspector)
102-
}
103-
}
104-
}
105-
}
68+
private[this] val hadoopReader = new HadoopTableReader(attributes, relation, context)
10669

10770
private[this] def castFromString(value: String, dataType: DataType) = {
10871
Cast(Literal(value), dataType).eval(null)
@@ -114,6 +77,7 @@ case class HiveTableScan(
11477
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
11578

11679
if (attributes.size == relation.output.size) {
80+
// SQLContext#pruneFilterProject guarantees no duplicated value in `attributes`
11781
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
11882
} else {
11983
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
@@ -140,12 +104,6 @@ case class HiveTableScan(
140104

141105
addColumnMetadataToConf(context.hiveconf)
142106

143-
private def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
144-
hadoopReader.makeRDDForTable(relation.hiveQlTable)
145-
} else {
146-
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
147-
}
148-
149107
/**
150108
* Prunes partitions not involve the query plan.
151109
*
@@ -169,44 +127,10 @@ case class HiveTableScan(
169127
}
170128
}
171129

172-
override def execute() = {
173-
inputRdd.mapPartitions { iterator =>
174-
if (iterator.isEmpty) {
175-
Iterator.empty
176-
} else {
177-
val mutableRow = new GenericMutableRow(attributes.length)
178-
val mutablePair = new MutablePair[Any, Array[String]]()
179-
val buffered = iterator.buffered
180-
181-
// NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
182-
// matching are avoided intentionally.
183-
val rowsAndPartitionKeys = buffered.head match {
184-
// With partition keys
185-
case _: Array[Any] =>
186-
buffered.map { case array: Array[Any] =>
187-
val deserializedRow = array(0)
188-
val partitionKeys = array(1).asInstanceOf[Array[String]]
189-
mutablePair.update(deserializedRow, partitionKeys)
190-
}
191-
192-
// Without partition keys
193-
case _ =>
194-
val emptyPartitionKeys = Array.empty[String]
195-
buffered.map { deserializedRow =>
196-
mutablePair.update(deserializedRow, emptyPartitionKeys)
197-
}
198-
}
199-
200-
rowsAndPartitionKeys.map { pair =>
201-
var i = 0
202-
while (i < attributes.length) {
203-
mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
204-
i += 1
205-
}
206-
mutableRow: Row
207-
}
208-
}
209-
}
130+
override def execute() = if (!relation.hiveQlTable.isPartitioned) {
131+
hadoopReader.makeRDDForTable(relation.hiveQlTable)
132+
} else {
133+
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
210134
}
211135

212136
override def output = attributes
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
100 100 2010-01-01
2+
200 200 2010-01-02

0 commit comments

Comments
 (0)