Skip to content

Commit adf7132

Browse files
[Enhancement] add key_metadata field in the iceberg distributed plan (StarRocks#45426)
Signed-off-by: stephen <[email protected]>
1 parent cadf909 commit adf7132

File tree

7 files changed

+21
-7
lines changed

7 files changed

+21
-7
lines changed

be/src/runtime/metadata_result_writer.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ StatusOr<TFetchDataResultPtr> MetadataResultWriter::_process_chunk(Chunk* chunk)
145145
// 10 -> "file_sequence_number"
146146
// 11 -> "data_sequence_number"
147147
// 12 -> "column_stats"
148+
// 13 -> "key_metadata"
148149
Status MetadataResultWriter::_fill_iceberg_metadata(const Columns& columns, const Chunk* chunk,
149150
TFetchDataResult* result) const {
150151
SCOPED_TIMER(_serialize_timer);
@@ -163,6 +164,7 @@ Status MetadataResultWriter::_fill_iceberg_metadata(const Columns& columns, cons
163164
auto file_sequence_number = down_cast<Int64Column*>(ColumnHelper::get_data_column(columns[10].get()));
164165
auto data_sequence_number = down_cast<Int64Column*>(ColumnHelper::get_data_column(columns[11].get()));
165166
auto iceberg_metrics = down_cast<BinaryColumn*>(ColumnHelper::get_data_column(columns[12].get()));
167+
auto key_metadata = down_cast<BinaryColumn*>(ColumnHelper::get_data_column(columns[13].get()));
166168

167169
std::vector<TMetadataEntry> meta_entries;
168170
int num_rows = chunk->num_rows();
@@ -212,6 +214,9 @@ Status MetadataResultWriter::_fill_iceberg_metadata(const Columns& columns, cons
212214
if (!columns[12]->is_null(i)) {
213215
iceberg_meta.__set_column_stats(iceberg_metrics->get_slice(i).to_string());
214216
}
217+
if (!columns[13]->is_null(i)) {
218+
iceberg_meta.__set_key_metadata(key_metadata->get_slice(i).to_string());
219+
}
215220
}
216221

217222
result->result_batch.rows.resize(num_rows);

fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/IcebergMetadataCollectJob.java

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class IcebergMetadataCollectJob extends MetadataCollectJob {
3333
", file_sequence_number" + // BIGINT
3434
", data_sequence_number " + // BIGINT
3535
", column_stats " + // BINARY
36+
", key_metadata " + // BINARY
3637
"FROM `$catalogName`.`$dbName`.`$tableName$logical_iceberg_metadata` " +
3738
"FOR VERSION AS OF $snapshotId " +
3839
"WHERE $predicate'";

fe/fe-core/src/main/java/com/starrocks/connector/metadata/iceberg/LogicalIcebergMetadataTable.java

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public static LogicalIcebergMetadataTable create(String catalogName, String orig
6464
.column("file_sequence_number", ScalarType.createType(PrimitiveType.BIGINT))
6565
.column("data_sequence_number", ScalarType.createType(PrimitiveType.BIGINT))
6666
.column("column_stats", ScalarType.createType(PrimitiveType.VARBINARY))
67+
.column("key_metadata", ScalarType.createType(PrimitiveType.VARBINARY))
6768
.build(),
6869
originDb,
6970
originTable,

fe/fe-core/src/main/java/org/apache/iceberg/MetadataParser.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ private ContentFile<?> parseThriftToIcebergDataFile(TMetadataEntry entry) {
236236
// build equality field id
237237
int[] equalityFieldIds = thrift.isSetEquality_ids() ? ArrayUtil.toIntArray(thrift.getEquality_ids()) : null;
238238

239+
// build key metadata
240+
ByteBuffer keyMetadata = thrift.isSetKey_metadata() ? ByteBuffer.wrap(thrift.getKey_metadata()) : null;
241+
239242
BaseFile<?> baseFile;
240243
// TODO(stephen): add keyMetadata field
241244
if (content == FileContent.DATA) {
@@ -246,7 +249,7 @@ private ContentFile<?> parseThriftToIcebergDataFile(TMetadataEntry entry) {
246249
partitionData,
247250
fileLength,
248251
metrics,
249-
null,
252+
keyMetadata,
250253
splitOffsets,
251254
null);
252255
} else {
@@ -261,7 +264,7 @@ private ContentFile<?> parseThriftToIcebergDataFile(TMetadataEntry entry) {
261264
equalityFieldIds,
262265
sortId,
263266
splitOffsets,
264-
null
267+
keyMetadata
265268
);
266269
}
267270

fe/fe-core/src/test/java/com/starrocks/connector/iceberg/IcebergMetadataTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -1364,9 +1364,10 @@ org.apache.iceberg.Table getTable(String dbName, String tableName) throws StarRo
13641364
MetadataCollectJob collectJob = new IcebergMetadataCollectJob("iceberg_catalog", "db", "table",
13651365
TResultSinkType.METADATA_ICEBERG, snapshotId, "");
13661366
collectJob.init(starRocksAssert.getCtx().getSessionVariable());
1367-
String expectedSql = "SELECT content, file_path, file_format, spec_id, partition_data, record_count, " +
1368-
"file_size_in_bytes, split_offsets, sort_id, equality_ids, file_sequence_number, data_sequence_number , " +
1369-
"column_stats FROM `iceberg_catalog`.`db`.`table$logical_iceberg_metadata` FOR VERSION AS OF 1 WHERE 1=1'";
1367+
String expectedSql = "SELECT content, file_path, file_format, spec_id, partition_data, record_count," +
1368+
" file_size_in_bytes, split_offsets, sort_id, equality_ids, file_sequence_number," +
1369+
" data_sequence_number , column_stats , key_metadata FROM" +
1370+
" `iceberg_catalog`.`db`.`table$logical_iceberg_metadata` FOR VERSION AS OF 1 WHERE 1=1'";
13701371
Assert.assertEquals(expectedSql, collectJob.getSql());
13711372
Assert.assertNotNull(collectJob.getContext());
13721373
Assert.assertTrue(collectJob.getContext().isMetadataContext());

gensrc/thrift/Data.thrift

+1
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ struct TIcebergMetadata {
142142
11: optional i64 file_sequence_number
143143
12: optional i64 data_sequence_number
144144
13: optional binary column_stats;
145+
14: optional binary key_metadata;
145146
}
146147

147148
// Metadata data for metadata table

java-extensions/iceberg-metadata-reader/src/main/java/com/starrocks/connector/iceberg/IcebergMetadataScanner.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.iceberg.expressions.Expression;
3636
import org.apache.iceberg.expressions.Expressions;
3737
import org.apache.iceberg.io.CloseableIterator;
38-
import org.apache.iceberg.util.ByteBuffers;
3938
import org.apache.logging.log4j.LogManager;
4039
import org.apache.logging.log4j.Logger;
4140

@@ -46,6 +45,7 @@
4645
import java.util.Map;
4746
import java.util.stream.Collectors;
4847

48+
import static org.apache.iceberg.util.ByteBuffers.toByteArray;
4949
import static org.apache.iceberg.util.SerializationUtil.deserializeFromBase64;
5050

5151
public class IcebergMetadataScanner extends ConnectorScanner {
@@ -243,6 +243,8 @@ private Object get(String columnName, ContentFile<?> file) {
243243
return file.dataSequenceNumber();
244244
case "column_stats":
245245
return getIcebergMetrics(file);
246+
case "key_metadata":
247+
return file.keyMetadata() == null ? null : toByteArray(file.keyMetadata());
246248
default:
247249
throw new IllegalArgumentException("Unrecognized column name " + columnName);
248250
}
@@ -299,7 +301,7 @@ private Map<Integer, byte[]> convertByteBufferMap(Map<Integer, ByteBuffer> byteB
299301
return byteBufferMap.entrySet().stream()
300302
.collect(Collectors.toMap(
301303
Map.Entry::getKey,
302-
entry -> ByteBuffers.toByteArray(entry.getValue())));
304+
entry -> toByteArray(entry.getValue())));
303305
}
304306

305307
private void parseRequiredTypes() {

0 commit comments

Comments
 (0)