Skip to content

Commit 6c1106f

Browse files
committed
HIVE-29133: Support Z-order indexing for Iceberg tables via CREATE TABLE DDL
1 parent c338904 commit 6c1106f

File tree

15 files changed

+1157
-21
lines changed

15 files changed

+1157
-21
lines changed

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/BaseHiveIcebergMetaHook.java

Lines changed: 61 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
4242
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFieldDesc;
4343
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.SortFields;
44+
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZOrderFieldDesc;
45+
import org.apache.hadoop.hive.ql.ddl.misc.sortoder.ZorderFields;
4446
import org.apache.hadoop.hive.ql.util.NullOrdering;
4547
import org.apache.iceberg.BaseMetastoreTableOperations;
4648
import org.apache.iceberg.BaseTable;
@@ -217,31 +219,73 @@ private void validateCatalogConfigsDefined() {
217219
}
218220
}
219221

222+
/**
223+
* Persists the table's write sort order based on the HMS property 'default-sort-order'
224+
* that is populated by the DDL layer.
225+
*
226+
* Behaviour:
227+
* - If the JSON represents Z-order, we remove DEFAULT_SORT_ORDER
228+
* as Iceberg does not have Z-order support in its spec.
229+
* So, we persist Z-order metadata in 'sort.order' and 'sort.columns' to be used by Hive Writer.
230+
* - Otherwise, the JSON is a list of SortFields; we convert it to Iceberg
231+
* SortOrder JSON and keep it in DEFAULT_SORT_ORDER for Iceberg to use it.
232+
*/
220233
private void setSortOrder(org.apache.hadoop.hive.metastore.api.Table hmsTable, Schema schema,
221234
Properties properties) {
222-
String sortOderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
223-
SortFields sortFields = null;
224-
if (!Strings.isNullOrEmpty(sortOderJSONString)) {
235+
String sortOrderJSONString = hmsTable.getParameters().get(TableProperties.DEFAULT_SORT_ORDER);
236+
if (!Strings.isNullOrEmpty(sortOrderJSONString)) {
225237
try {
226-
sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOderJSONString, SortFields.class);
238+
if (isZOrderJSON(sortOrderJSONString)) {
239+
properties.remove(TableProperties.DEFAULT_SORT_ORDER);
240+
ZorderFields zorderFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, ZorderFields.class);
241+
if (zorderFields != null && !zorderFields.getZOrderFields().isEmpty()) {
242+
setZOrderSortOrder(zorderFields, properties);
243+
}
244+
} else {
245+
SortFields sortFields = JSON_OBJECT_MAPPER.reader().readValue(sortOrderJSONString, SortFields.class);
246+
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
247+
SortOrder.Builder sortOrderBuilder = SortOrder.builderFor(schema);
248+
sortFields.getSortFields().forEach(fieldDesc -> {
249+
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
250+
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
251+
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
252+
SortDirection.ASC : SortDirection.DESC;
253+
sortOrderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
254+
});
255+
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOrderBuilder.build()));
256+
}
257+
}
227258
} catch (Exception e) {
228-
LOG.warn("Can not read write order json: {}", sortOderJSONString, e);
229-
return;
230-
}
231-
if (sortFields != null && !sortFields.getSortFields().isEmpty()) {
232-
SortOrder.Builder sortOderBuilder = SortOrder.builderFor(schema);
233-
sortFields.getSortFields().forEach(fieldDesc -> {
234-
NullOrder nullOrder = fieldDesc.getNullOrdering() == NullOrdering.NULLS_FIRST ?
235-
NullOrder.NULLS_FIRST : NullOrder.NULLS_LAST;
236-
SortDirection sortDirection = fieldDesc.getDirection() == SortFieldDesc.SortDirection.ASC ?
237-
SortDirection.ASC : SortDirection.DESC;
238-
sortOderBuilder.sortBy(fieldDesc.getColumnName(), sortDirection, nullOrder);
239-
});
240-
properties.put(TableProperties.DEFAULT_SORT_ORDER, SortOrderParser.toJson(sortOderBuilder.build()));
259+
LOG.warn("Can not read write order json: {}", sortOrderJSONString, e);
241260
}
242261
}
243262
}
244263

264+
/**
265+
* Configures the Z-order sort order metadata in the given properties
266+
* based on the specified Z-order fields.
267+
*
268+
* @param zOrderFields the ZorderFields containing columns for Z-order sorting
269+
* @param properties the Properties object to store sort order metadata
270+
*/
271+
private void setZOrderSortOrder(ZorderFields zOrderFields, Properties properties) {
272+
List<String> columnNames = zOrderFields.getZOrderFields().stream()
273+
.map(ZOrderFieldDesc::getColumnName)
274+
.collect(Collectors.toList());
275+
276+
LOG.info("Setting Z-order sort order for columns: {}", columnNames);
277+
278+
properties.put("sort.order", "ZORDER");
279+
properties.put("sort.columns", String.join(",", columnNames));
280+
281+
LOG.info("Z-order sort order configured for Iceberg table with columns: {}", columnNames);
282+
}
283+
284+
private boolean isZOrderJSON(String jsonString) {
285+
return jsonString.contains("zorderFields");
286+
}
287+
288+
245289
@Override
246290
public void rollbackCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
247291
// do nothing

iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.hadoop.hive.ql.ddl.table.create.like.CreateTableLikeDesc;
8383
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
8484
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
85+
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
8586
import org.apache.hadoop.hive.ql.exec.Utilities;
8687
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
8788
import org.apache.hadoop.hive.ql.io.IOConstants;
@@ -119,6 +120,7 @@
119120
import org.apache.hadoop.hive.ql.session.SessionState;
120121
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
121122
import org.apache.hadoop.hive.ql.stats.Partish;
123+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
122124
import org.apache.hadoop.hive.ql.util.NullOrdering;
123125
import org.apache.hadoop.hive.serde2.AbstractSerDe;
124126
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -184,6 +186,7 @@
184186
import org.apache.iceberg.mr.InputFormatConfig;
185187
import org.apache.iceberg.mr.hive.actions.HiveIcebergDeleteOrphanFiles;
186188
import org.apache.iceberg.mr.hive.plan.IcebergBucketFunction;
189+
import org.apache.iceberg.mr.hive.udf.GenericUDFIcebergZorder;
187190
import org.apache.iceberg.puffin.Blob;
188191
import org.apache.iceberg.puffin.BlobMetadata;
189192
import org.apache.iceberg.puffin.Puffin;
@@ -929,9 +932,64 @@ public DynamicPartitionCtx createDPContext(
929932
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table));
930933
}
931934

935+
// Even if table has no explicit sort order, honor z-order if configured
936+
Map<String, String> props = table.properties();
937+
if ("ZORDER".equalsIgnoreCase(props.getOrDefault("sort.order", ""))) {
938+
createZOrderCustomSort(props, dpCtx, table, hmsTable, writeOperation);
939+
}
940+
932941
return dpCtx;
933942
}
934943

944+
/**
945+
* Adds a custom sort expression to the DynamicPartitionCtx that performs local Z-ordering on write.
946+
*
947+
* Behavior:
948+
* - Reads Z-order properties from 'sort.order' and 'sort.columns' (comma-separated).
949+
* - Resolves the referenced columns to their positions in the physical row (taking into account
950+
* ACID virtual columns offset for overwrite/update operations).
951+
* - Configures a single ASC sort key with NULLS FIRST and injects a custom key expression for
952+
* Z-order
953+
*/
954+
private void createZOrderCustomSort(Map<String, String> props, DynamicPartitionCtx dpCtx, Table table,
955+
org.apache.hadoop.hive.ql.metadata.Table hmsTable, Operation writeOperation) {
956+
String colsProp = props.get("sort.columns");
957+
if (StringUtils.isNotBlank(colsProp)) {
958+
List<String> zCols = Arrays.stream(colsProp.split(",")).map(String::trim)
959+
.filter(s -> !s.isEmpty()).collect(Collectors.toList());
960+
961+
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
962+
List<Types.NestedField> fields = table.schema().columns();
963+
for (int i = 0; i < fields.size(); ++i) {
964+
fieldOrderMap.put(fields.get(i).name(), i);
965+
}
966+
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
967+
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();
968+
969+
List<Integer> zIndices = zCols.stream().map(col -> {
970+
Integer base = fieldOrderMap.get(col);
971+
Preconditions.checkArgument(base != null, "Z-order column not found in schema: %s", col);
972+
return base + offset;
973+
}).collect(Collectors.toList());
974+
975+
dpCtx.setCustomSortOrder(Lists.newArrayList(Collections.singletonList(1)));
976+
dpCtx.setCustomSortNullOrder(Lists.newArrayList(Collections.singletonList(NullOrdering.NULLS_FIRST.getCode())));
977+
978+
dpCtx.addCustomSortExpressions(Collections.singletonList(allCols -> {
979+
List<ExprNodeDesc> args = Lists.newArrayListWithExpectedSize(zIndices.size());
980+
for (Integer idx : zIndices) {
981+
args.add(allCols.get(idx));
982+
}
983+
try {
984+
GenericUDF udf = new GenericUDFIcebergZorder();
985+
return ExprNodeGenericFuncDesc.newInstance(udf, "iceberg_zorder", args);
986+
} catch (UDFArgumentException e) {
987+
throw new RuntimeException(e);
988+
}
989+
}));
990+
}
991+
}
992+
935993
private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
936994
Operation writeOperation, DynamicPartitionCtx dpCtx,
937995
List<TransformSpec> transformSpecs) {
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.mr.hive.udf;
21+
22+
import java.nio.ByteBuffer;
23+
import java.nio.charset.StandardCharsets;
24+
import org.apache.hadoop.hive.ql.exec.Description;
25+
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
26+
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
27+
import org.apache.hadoop.hive.ql.metadata.HiveException;
28+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
29+
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
30+
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
31+
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
32+
import org.apache.hadoop.io.BytesWritable;
33+
import org.apache.iceberg.util.ZOrderByteUtils;
34+
35+
/**
36+
* Hive UDF to compute the Z-order value of given input columns using Iceberg's ZOrderByteUtils.
37+
* Supports various primitive types and converts inputs into interleaved binary representation.
38+
*/
39+
@Description(name = "iceberg_zorder",
40+
value = "_FUNC_(value) - " +
41+
"Returns the z-value calculated by Iceberg ZOrderByteUtils class")
42+
public class GenericUDFIcebergZorder extends GenericUDF {
43+
private PrimitiveObjectInspector[] argOIs;
44+
// For variable-length types (e.g., strings), how many bytes contribute to z-order
45+
private final int varLengthContribution = 8;
46+
private transient ByteBuffer[] reUseBuffer;
47+
private static final int MAX_OUTPUT_SIZE = Integer.MAX_VALUE;
48+
49+
/**
50+
* Initializes the UDF, validating argument types are primitives and preparing buffers.
51+
*/
52+
@Override
53+
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
54+
if (arguments.length < 2) {
55+
throw new UDFArgumentException("iceberg_zorder requires at least 2 arguments");
56+
}
57+
argOIs = new PrimitiveObjectInspector[arguments.length];
58+
reUseBuffer = new ByteBuffer[arguments.length];
59+
for (int i = 0; i < arguments.length; i++) {
60+
if (!(arguments[i] instanceof PrimitiveObjectInspector poi)) {
61+
throw new UDFArgumentTypeException(i, "Only primitive types supported for z-order");
62+
}
63+
argOIs[i] = poi;
64+
}
65+
return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector;
66+
}
67+
68+
/**
69+
* Evaluates the UDF by converting input values to ordered bytes, interleaving them,
70+
* and returning the resulting Z-order binary value.
71+
*/
72+
@Override
73+
public Object evaluate(DeferredObject[] arguments) throws HiveException {
74+
byte[][] inputs = new byte[arguments.length][];
75+
int totalLength = 0;
76+
77+
for (int i = 0; i < arguments.length; i++) {
78+
byte[] orderedBytes = convertToOrderedBytes(arguments[i].get(), argOIs[i], i);
79+
inputs[i] = orderedBytes;
80+
totalLength += orderedBytes.length;
81+
}
82+
83+
int outputLength = Math.min(totalLength, MAX_OUTPUT_SIZE);
84+
ByteBuffer buffer = ByteBuffer.allocate(outputLength);
85+
86+
byte[] interleaved = ZOrderByteUtils.interleaveBits(inputs, outputLength, buffer);
87+
return new BytesWritable(interleaved);
88+
}
89+
90+
@Override
91+
public String getDisplayString(String[] children) {
92+
return "iceberg_zorder(" + String.join(", ", children) + ")";
93+
}
94+
95+
/**
96+
* Converts a single input value to its ordered byte representation based on type.
97+
* @return fixed-length byte arrays to be used in interleaving.
98+
*/
99+
private byte[] convertToOrderedBytes(Object value, PrimitiveObjectInspector oi,
100+
int position) throws HiveException {
101+
if (value == null) {
102+
// For NULL values, we have primitive buffer size of 8 with values of 0
103+
return ByteBuffer.wrap(new byte[ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE]).array();
104+
}
105+
106+
if (reUseBuffer[position] == null) {
107+
reUseBuffer[position] = ByteBuffer.allocate(ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE);
108+
}
109+
switch (oi.getPrimitiveCategory()) {
110+
case BOOLEAN:
111+
boolean boolValue = (Boolean) oi.getPrimitiveJavaObject(value);
112+
return ZOrderByteUtils.intToOrderedBytes(boolValue ? 1 : 0, reUseBuffer[position]).array();
113+
114+
case BYTE:
115+
byte byteValue = (Byte) oi.getPrimitiveJavaObject(value);
116+
return ZOrderByteUtils.tinyintToOrderedBytes(byteValue, reUseBuffer[position]).array();
117+
118+
case SHORT:
119+
short shortValue = (Short) oi.getPrimitiveJavaObject(value);
120+
return ZOrderByteUtils.shortToOrderedBytes(shortValue, reUseBuffer[position]).array();
121+
122+
case INT:
123+
int intValue = (Integer) oi.getPrimitiveJavaObject(value);
124+
return ZOrderByteUtils.intToOrderedBytes(intValue, reUseBuffer[position]).array();
125+
126+
case LONG:
127+
long longValue = (Long) oi.getPrimitiveJavaObject(value);
128+
return ZOrderByteUtils.longToOrderedBytes(longValue, reUseBuffer[position]).array();
129+
130+
case FLOAT:
131+
float floatValue = (Float) oi.getPrimitiveJavaObject(value);
132+
return ZOrderByteUtils.floatToOrderedBytes(floatValue, reUseBuffer[position]).array();
133+
134+
case DOUBLE:
135+
double doubleValue = (Double) oi.getPrimitiveJavaObject(value);
136+
return ZOrderByteUtils.doubleToOrderedBytes(doubleValue, reUseBuffer[position]).array();
137+
138+
case DATE:
139+
// Get data in epoch seconds and convert it to long
140+
Object dateValue = oi.getPrimitiveJavaObject(value);
141+
long dateInSeconds;
142+
if (dateValue instanceof java.sql.Date dd) {
143+
dateInSeconds = dd.getTime() / 1000L;
144+
} else if (dateValue instanceof org.apache.hadoop.hive.common.type.Date dd) {
145+
dateInSeconds = dd.toEpochSecond();
146+
} else {
147+
throw new HiveException("Unsupported DATE backing type: " + dateValue.getClass());
148+
}
149+
return ZOrderByteUtils.longToOrderedBytes(dateInSeconds, reUseBuffer[position]).array();
150+
151+
case TIMESTAMP:
152+
Object tsValue = oi.getPrimitiveJavaObject(value);
153+
long tsInSeconds;
154+
if (tsValue instanceof org.apache.hadoop.hive.common.type.Timestamp ts) {
155+
tsInSeconds = ts.toEpochSecond();
156+
} else if (tsValue instanceof java.sql.Timestamp ts) {
157+
tsInSeconds = ts.getTime() / 1000L;
158+
} else {
159+
throw new HiveException("Unsupported TIMESTAMP backing type: " + tsValue.getClass());
160+
}
161+
return ZOrderByteUtils.longToOrderedBytes(tsInSeconds, reUseBuffer[position]).array();
162+
163+
case CHAR:
164+
case VARCHAR:
165+
case STRING:
166+
String strVal = String.valueOf(oi.getPrimitiveJavaObject(value));
167+
return ZOrderByteUtils.stringToOrderedBytes(strVal, varLengthContribution,
168+
reUseBuffer[position], StandardCharsets.UTF_8.newEncoder()).array();
169+
170+
default:
171+
throw new HiveException("Unsupported type in z-order: " + oi.getPrimitiveCategory());
172+
}
173+
}
174+
}

0 commit comments

Comments
 (0)