Skip to content

Commit d60b09b

Browse files
concretevitaminmarmbrus
authored andcommitted
[SPARK-2443][SQL] Fix slow read from partitioned tables
This fix obtains a comparable performance boost as [PR #1390](#1390) by moving an array update and deserializer initialization out of a potentially very long loop. Suggested by yhuai. The below results are updated for this fix. ## Benchmarks Generated a local text file with 10M rows of simple key-value pairs. The data is loaded as a table through Hive. Results are obtained on my local machine using hive/console. Without the fix: Type | Non-partitioned | Partitioned (1 part) ------------ | ------------ | ------------- First run | 9.52s end-to-end (1.64s Spark job) | 36.6s (28.3s) Stablized runs | 1.21s (1.18s) | 27.6s (27.5s) With this fix: Type | Non-partitioned | Partitioned (1 part) ------------ | ------------ | ------------- First run | 9.57s (1.46s) | 11.0s (1.69s) Stablized runs | 1.13s (1.10s) | 1.23s (1.19s) Author: Zongheng Yang <[email protected]> Closes #1408 from concretevitamin/slow-read-2 and squashes the following commits: d86e437 [Zongheng Yang] Move update & initialization out of potentially long loop.
1 parent 38ccd6e commit d60b09b

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

+7-3
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,17 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
164164
hivePartitionRDD.mapPartitions { iter =>
165165
val hconf = broadcastedHiveConf.value.value
166166
val rowWithPartArr = new Array[Object](2)
167+
168+
// The update and deserializer initialization are intentionally
169+
// kept out of the below iter.map loop to save performance.
170+
rowWithPartArr.update(1, partValues)
171+
val deserializer = localDeserializer.newInstance()
172+
deserializer.initialize(hconf, partProps)
173+
167174
// Map each tuple to a row object
168175
iter.map { value =>
169-
val deserializer = localDeserializer.newInstance()
170-
deserializer.initialize(hconf, partProps)
171176
val deserializedRow = deserializer.deserialize(value)
172177
rowWithPartArr.update(0, deserializedRow)
173-
rowWithPartArr.update(1, partValues)
174178
rowWithPartArr.asInstanceOf[Object]
175179
}
176180
}

0 commit comments

Comments
 (0)