Skip to content

Conversation

unical1988
Copy link
Contributor

Added ConversionSource for Parquet files

  • Partitions format are specified by a user config.

public InternalSnapshot getCurrentSnapshot() {
List<InternalDataFile> internalDataFiles = getInternalDataFiles();
InternalTable table = getTable(-1L);
return InternalSnapshot.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to set the version here. I am guessing it should be the last modification time but I need to think it through more.


@Builder
// @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ParquetConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us to provide support for incremental sync, I think this may need to be a range instead of a singular point. That way we can filter the files by a start and end time when performing the sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unical1988 this is related to the incremental sync paths. We'll want this source to have some time range to find the files that need to be synced. If we don't do this, we will need to limit this source to snapshot syncs.

… PartitionFieldSpec + no file diffs removed files
ParquetMetadata parquetMetadata =
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath());

List<InternalPartitionField> partitionFields =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetPartitionSpecExtractor should be used here. The spec extractor is used for defining which fields are used and the ParquetPartitionValueExtractor should be used to get the values for the partition fields per file.

Copy link
Contributor Author

@unical1988 unical1988 Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case partitionValueExtractor.extractParquetPartitions() is useless? as matter of fact the it is called down in the code to extract the partition fields (should the other calls be replaced as well?) ALSO should ParquetPartitionValueExtractor have a constructor with as parameter a ParquetPartitionSpecExtractor obj, or use it use through an instance in ParquetPartitionValueExtractor ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec of the partitions is different from the values of the partitions. When you define a table, you define the partitioning of that table as a whole. For example, you would describe the table as "partitioned on date with day granularity" instead of "partition is 2025-07-29." The extractParquetPartitions is still required for attaching the partition values to the files.

@@ -141,7 +140,7 @@ private static Stream<Arguments> testCasesWithPartitioningAndSyncModes() {

private static Stream<Arguments> generateTestParametersForFormatsSyncModesAndPartitioning() {
List<Arguments> arguments = new ArrayList<>();
for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PARQUET)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will likely cause some issues since we don't have a concept for updates or deletes in the parquet source. It is assumed to be append-only. We can instead setup a new test in this class that creates parquet tables, converts them to all 3 other table formats, and validates the data using the existing helpers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice you start the tests by creating a SparkSession, can we leverage it for creating the parquet file to convert into 3 other formats? is a call to conversionController.sync(conversionConfig, conversionSourceProvider); sufficient to test the coversion? do you validate the converted data through checkDatasetEquivalence() (all of ITConversionController.java) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can use the spark session and your understanding of the methods is correct

@@ -27,6 +27,7 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
public static final String PARQUET = "PARQUET";

public static String[] values() {
return new String[] {"HUDI", "ICEBERG", "DELTA"};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should PARQUET be added here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was added here because of TableFormat.Parquet to be defined, but for the old tests to pass, It needs to be discarded.

}

@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like we're just checking if there are files older than the provided instant. Is this because we are assuming the files may be removed from the source table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there other way to think of it? how would approach it otherwise?


@Builder
// @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ParquetConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unical1988 this is related to the incremental sync paths. We'll want this source to have some time range to find the files that need to be synced. If we don't do this, we will need to limit this source to snapshot syncs.

writeStatus.setStat(writeStat);
return writeStatus;
}

private Map<String, HoodieColumnRangeMetadata<Comparable>> convertColStats(
String fileName, List<ColumnStat> columnStatMap) {
String fileName, List<ColumnStat> columnStatMap, String fileFormat) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot have this be dependent on fileFormat as mentioned before. The intermediate object should be standardized so it is not dependent on source format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isnt, i forgot to remove the parameter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, pls check with me the current CI error in TestDeltaSync and TestIcebergSync a tiny error must be in a certain path

schema,
dataFile.getRecordCount(),
dataFile.getColumnStats(),
dataFile.getFileFormat().toString()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be cleaned up as well. The change in expected args is causing the mocks to no longer match and that is causing the test failures for Iceberg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You re right, i forgot to revert back the changes here too, now done

@@ -50,7 +50,8 @@ public static IcebergColumnStatsConverter getInstance() {
return INSTANCE;
}

public Metrics toIceberg(Schema schema, long totalRowCount, List<ColumnStat> fieldColumnStats) {
public Metrics toIceberg(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to this file and others that are just formatting in the Iceberg and Delta paths should be reverted to minimize the diff to just the parts that are essential for review.

import org.apache.xtable.hudi.HudiTestUtil;
import org.apache.xtable.model.sync.SyncMode;

public class TestParquetConversionSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to either move this to run in the integration tests by changing Test to IT or it needs to set @Execution(SAME_THREAD) so that it will not try to start a spark session in the same JVM as the other tests like TestDeltaSync. This looks like the reason why TestDeltaSync will fail when run as part of the full suite but not when run in isolation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes renaming the test class with prefix IT solved it

.dataType(InternalType.RECORD)
.fields(subFields)
.isNullable(isNullable(schema.asGroupType()))
.isNullable(
isNullable(schema.asGroupType())) // false isNullable(schema.asGroupType()) (TODO causing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is returning that the top level record schema is nullable which should not be the case. Do we need some special handling for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As is, it does not seem to cause any error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is causing errors when syncing to Hudi since it is sending a union of null and the actual schema to the target instead of simply the record schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will recheck but the CI does not fail on that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems indeed that this requires special handling in the case of passed schema (record)

Comment on lines 122 to 147
columnMetaData.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName
.BINARY // TODO how about DECIMAL, JSON, BSON
// and ENUM logicalTypes?
? columnMetaData
.getPrimitiveType()
.getLogicalTypeAnnotation()
!= null
? columnMetaData
.getPrimitiveType()
.getLogicalTypeAnnotation()
.toString()
.equals("STRING")
? new String(
((Binary)
columnMetaData
.getStatistics()
.genericGetMin())
.getBytes(),
StandardCharsets.UTF_8)
: columnMetaData.getStatistics().genericGetMin()
: columnMetaData.getStatistics().genericGetMin()
: columnMetaData
.getStatistics()
.genericGetMin(), // if stats are string convert to
// litteraly a string stat and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this conversion logic into a helper method and then add a new GH Issue for handling any other logical types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

partitionValueExtractor.extractSchemaForParquetPartitions(
parquetMetadataExtractor.readParquetMetadata(hadoopConf, file.getPath()),
file.getPath().toString())),
parentPath.toString());
} catch (java.io.IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just hiding the exception currently and we need to fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is it hiding which exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It catches the exception but does not throw any new exception. The user will not know of the errors.

@@ -188,6 +306,10 @@ private ConversionSourceProvider<?> getConversionSourceProvider(String sourceTab
throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat);
}
}
/*
test for Parquet file conversion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these changes then if they are not required

Comment on lines 32 to 34
public static String getPartitionPathValue(Path tableBasePath, Path filePath) {
return getPartitionPath(tableBasePath, filePath).split("=")[1];
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are multiple = in the path? Like for /some/path/year=2025/month=08/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nested partition are not handled by that helper, will need some tweaking

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also this should be moved to the parquet utils directly since Hudi has handling for multiple partition fields already

Copy link
Contributor Author

@unical1988 unical1988 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or i can use the hudi logic for that in parquet?

Copy link
Contributor Author

@unical1988 unical1988 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, since the sourcefield name (timestamp) is different from the partition col (year) i am not sure how Hudi logic extracts the values of partition from the path?

Copy link
Contributor Author

@unical1988 unical1988 Aug 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To note that I cannot set sourceField as the year col, since year col is not part of the schema which makes the hudi logic for extracting values of part from path unusable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the user provided partition spec for Hudi to determine the mapping of field in the data to the partition path pattern. For example, a field in the data called ts is mapped to year=YYYY/month=MM/day=DD with a config ts:DAY:year=YYYY/month=MM/day=DD.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current partitioning logic (hudi) (reflected in PathBasedPartitionValuesExtractor) allows rather for config in the following form: ts:DAY:yyyy-mm-dd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the partitioning for parquet while keeping functions from hudi, i think it will be able to deal with nested partitions.


import org.apache.xtable.GenericTable;

public class TestSparkParquetTable implements GenericTable<Group, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being used by the tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants