diff --git a/core/src/test/java/io/onetable/GenericTable.java b/core/src/test/java/io/onetable/GenericTable.java index 0e2f2807d..75b94100c 100644 --- a/core/src/test/java/io/onetable/GenericTable.java +++ b/core/src/test/java/io/onetable/GenericTable.java @@ -48,6 +48,10 @@ public interface GenericTable extends AutoCloseable { String getBasePath(); + default String getDataPath() { + return getBasePath(); + } + String getOrderByColumn(); void close(); @@ -72,6 +76,9 @@ static GenericTable getInstance( case DELTA: return TestSparkDeltaTable.forStandardSchemaAndPartitioning( tableName, tempDir, sparkSession, isPartitioned ? "level" : null); + case ICEBERG: + return TestIcebergTable.forStandardSchemaAndPartitioning( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); default: throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); } @@ -91,6 +98,9 @@ static GenericTable getInstanceWithAdditionalColumns( case DELTA: return TestSparkDeltaTable.forSchemaWithAdditionalColumnsAndPartitioning( tableName, tempDir, sparkSession, isPartitioned ? "level" : null); + case ICEBERG: + return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning( + tableName, isPartitioned ? "level" : null, tempDir, jsc.hadoopConfiguration()); default: throw new IllegalArgumentException("Unsupported source format: " + sourceFormat); } diff --git a/core/src/test/java/io/onetable/ITOneTableClient.java b/core/src/test/java/io/onetable/ITOneTableClient.java index 1d66a84af..6eb58b757 100644 --- a/core/src/test/java/io/onetable/ITOneTableClient.java +++ b/core/src/test/java/io/onetable/ITOneTableClient.java @@ -48,7 +48,6 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -63,6 +62,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; @@ -77,6 +77,7 @@ import io.onetable.hudi.HudiSourceClientProvider; import io.onetable.hudi.HudiSourceConfig; import io.onetable.hudi.HudiTestUtil; +import io.onetable.iceberg.IcebergSourceClientProvider; import io.onetable.model.storage.TableFormat; import io.onetable.model.sync.SyncMode; @@ -87,8 +88,6 @@ public class ITOneTableClient { private static JavaSparkContext jsc; private static SparkSession sparkSession; - private SourceClientProvider hudiSourceClientProvider; - private SourceClientProvider deltaSourceClientProvider; @BeforeAll public static void setupOnce() { @@ -102,14 +101,6 @@ public static void setupOnce() { jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext()); } - @BeforeEach - public void setup() { - hudiSourceClientProvider = new HudiSourceClientProvider(); - hudiSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); - deltaSourceClientProvider = new DeltaSourceClientProvider(); - deltaSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); - } - @AfterAll public static void teardown() { if (jsc != null) { @@ -126,7 +117,8 @@ private static Stream testCasesWithPartitioningAndSyncModes() { private static Stream generateTestParametersForFormatsSyncModesAndPartitioning() { List arguments = new ArrayList<>(); - for (TableFormat sourceTableFormat : Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) { + for (TableFormat sourceTableFormat : + Arrays.asList(TableFormat.HUDI, TableFormat.DELTA, TableFormat.ICEBERG)) { for (SyncMode syncMode : SyncMode.values()) { for (boolean isPartitioned : new boolean[] {true, false}) { arguments.add(Arguments.of(sourceTableFormat, syncMode, isPartitioned)); @@ -142,9 +134,18 @@ private static Stream testCasesWithSyncModes() { private SourceClientProvider getSourceClientProvider(TableFormat sourceTableFormat) { if (sourceTableFormat == TableFormat.HUDI) { + SourceClientProvider hudiSourceClientProvider = new HudiSourceClientProvider(); + hudiSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); return hudiSourceClientProvider; } else if (sourceTableFormat == TableFormat.DELTA) { + SourceClientProvider deltaSourceClientProvider = new DeltaSourceClientProvider(); + deltaSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); return deltaSourceClientProvider; + } else if (sourceTableFormat == TableFormat.ICEBERG) { + SourceClientProvider icebergSourceClientProvider = + new IcebergSourceClientProvider(); + icebergSourceClientProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); + return icebergSourceClientProvider; } else { throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); } @@ -183,6 +184,7 @@ public void testVariousOperations( .tableName(tableName) .targetTableFormats(targetTableFormats) .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) .hudiSourceConfig( HudiSourceConfig.builder() .partitionFieldSpecConfig(oneTablePartitionConfig) @@ -215,6 +217,7 @@ public void testVariousOperations( .tableName(tableName) .targetTableFormats(targetTableFormats) .tableBasePath(tableWithUpdatedSchema.getBasePath()) + .tableDataPath(tableWithUpdatedSchema.getDataPath()) .hudiSourceConfig( HudiSourceConfig.builder() .partitionFieldSpecConfig(oneTablePartitionConfig) @@ -254,6 +257,7 @@ public void testVariousOperations( public void testConcurrentInsertWritesInSource( SyncMode syncMode, PartitionConfig partitionConfig) { String tableName = getTableName(); + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); List targetTableFormats = getOtherFormats(TableFormat.HUDI); try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( @@ -279,11 +283,11 @@ public void testConcurrentInsertWritesInSource( .syncMode(syncMode) .build(); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 50); table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 100); } } @@ -293,7 +297,7 @@ public void testConcurrentInsertWritesInSource( public void testConcurrentInsertsAndTableServiceWrites( SyncMode syncMode, PartitionConfig partitionConfig) { HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; - + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); List targetTableFormats = getOtherFormats(TableFormat.HUDI); String tableName = getTableName(); try (TestSparkHudiTable table = @@ -313,7 +317,7 @@ public void testConcurrentInsertsAndTableServiceWrites( .syncMode(syncMode) .build(); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 50); table.deleteRecords(insertedRecords1.subList(0, 20), true); @@ -321,7 +325,7 @@ public void testConcurrentInsertsAndTableServiceWrites( String scheduledCompactionInstant = table.onlyScheduleCompaction(); table.insertRecords(50, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); Map sourceHudiOptions = Collections.singletonMap("hoodie.datasource.query.type", "read_optimized"); // Because compaction is not completed yet and read optimized query, there are 100 records. @@ -334,7 +338,7 @@ public void testConcurrentInsertsAndTableServiceWrites( 100); table.insertRecords(50, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); // Because compaction is not completed yet and read optimized query, there are 150 records. checkDatasetEquivalence( TableFormat.HUDI, @@ -345,7 +349,7 @@ public void testConcurrentInsertsAndTableServiceWrites( 150); table.completeScheduledCompaction(scheduledCompactionInstant); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); checkDatasetEquivalence(TableFormat.HUDI, table, targetTableFormats, 130); } } @@ -353,7 +357,7 @@ public void testConcurrentInsertsAndTableServiceWrites( @ParameterizedTest @EnumSource( value = TableFormat.class, - names = {"HUDI", "DELTA"}) + names = {"HUDI", "DELTA", "ICEBERG"}) public void testTimeTravelQueries(TableFormat sourceTableFormat) throws Exception { String tableName = getTableName(); try (GenericTable table = @@ -365,6 +369,7 @@ public void testTimeTravelQueries(TableFormat sourceTableFormat) throws Exceptio .tableName(tableName) .targetTableFormats(targetTableFormats) .tableBasePath(table.getBasePath()) + .tableDataPath(table.getDataPath()) .syncMode(SyncMode.INCREMENTAL) .build(); SourceClientProvider sourceClientProvider = getSourceClientProvider(sourceTableFormat); @@ -462,6 +467,7 @@ public void testPartitionedData( String hudiPartitionConfig, String filter) { String tableName = getTableName(); + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, hudiPartitionConfig, HoodieTableType.COPY_ON_WRITE)) { @@ -478,10 +484,10 @@ public void testPartitionedData( .build(); table.insertRecords(100, true); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); // Do a second sync to force the test to read back the metadata it wrote earlier table.insertRecords(100, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); checkDatasetEquivalenceWithFilter(TableFormat.HUDI, table, targetTableFormats, filter); } @@ -491,6 +497,7 @@ public void testPartitionedData( @EnumSource(value = SyncMode.class) public void testSyncWithSingleFormat(SyncMode syncMode) { String tableName = getTableName(); + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { @@ -513,18 +520,18 @@ public void testSyncWithSingleFormat(SyncMode syncMode) { .build(); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); - oneTableClient.sync(perTableConfigIceberg, hudiSourceClientProvider); + oneTableClient.sync(perTableConfigIceberg, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 100); - oneTableClient.sync(perTableConfigDelta, hudiSourceClientProvider); + oneTableClient.sync(perTableConfigDelta, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.DELTA), 100); table.insertRecords(100, true); - oneTableClient.sync(perTableConfigIceberg, hudiSourceClientProvider); + oneTableClient.sync(perTableConfigIceberg, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 200); - oneTableClient.sync(perTableConfigDelta, hudiSourceClientProvider); + oneTableClient.sync(perTableConfigDelta, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.DELTA), 200); } @@ -533,6 +540,7 @@ public void testSyncWithSingleFormat(SyncMode syncMode) { @Test public void testOutOfSyncIncrementalSyncs() { String tableName = getTableName(); + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { @@ -555,13 +563,13 @@ public void testOutOfSyncIncrementalSyncs() { table.insertRecords(50, true); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); // sync iceberg only - oneTableClient.sync(singleTableConfig, hudiSourceClientProvider); + oneTableClient.sync(singleTableConfig, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 50); // insert more records table.insertRecords(50, true); // iceberg will be an incremental sync and delta will need to bootstrap with snapshot sync - oneTableClient.sync(dualTableConfig, hudiSourceClientProvider); + oneTableClient.sync(dualTableConfig, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 100); @@ -570,14 +578,14 @@ public void testOutOfSyncIncrementalSyncs() { // insert more records table.insertRecords(50, true); // incremental sync for two commits for iceberg only - oneTableClient.sync(singleTableConfig, hudiSourceClientProvider); + oneTableClient.sync(singleTableConfig, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Collections.singletonList(TableFormat.ICEBERG), 200); // insert more records table.insertRecords(50, true); // incremental sync for one commit for iceberg and three commits for delta - oneTableClient.sync(dualTableConfig, hudiSourceClientProvider); + oneTableClient.sync(dualTableConfig, sourceClientProvider); checkDatasetEquivalence( TableFormat.HUDI, table, Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), 250); } @@ -586,6 +594,7 @@ public void testOutOfSyncIncrementalSyncs() { @Test public void testMetadataRetention() { String tableName = getTableName(); + SourceClientProvider sourceClientProvider = getSourceClientProvider(TableFormat.HUDI); try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { @@ -599,7 +608,7 @@ public void testMetadataRetention() { .build(); OneTableClient oneTableClient = new OneTableClient(jsc.hadoopConfiguration()); table.insertRecords(10, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); // later we will ensure we can still read the source table at this instant to ensure that // neither target cleaned up the underlying parquet files in the table Instant instantAfterFirstCommit = Instant.now(); @@ -608,7 +617,7 @@ public void testMetadataRetention() { .forEach( unused -> { table.insertRecords(10, true); - oneTableClient.sync(perTableConfig, hudiSourceClientProvider); + oneTableClient.sync(perTableConfig, sourceClientProvider); }); // ensure that hudi rows can still be read and underlying files were not removed List rows = @@ -729,7 +738,7 @@ private void checkDatasetEquivalence( .read() .options(finalTargetOptions) .format(targetFormat.name().toLowerCase()) - .load(sourceTable.getBasePath()) + .load(sourceTable.getDataPath()) .orderBy(sourceTable.getOrderByColumn()) .filter(filterCondition); })); diff --git a/core/src/test/java/io/onetable/TestIcebergTable.java b/core/src/test/java/io/onetable/TestIcebergTable.java index dab281b44..fde62e76b 100644 --- a/core/src/test/java/io/onetable/TestIcebergTable.java +++ b/core/src/test/java/io/onetable/TestIcebergTable.java @@ -18,6 +18,7 @@ package io.onetable; +import static io.onetable.iceberg.TestIcebergDataHelper.createIcebergDataHelper; import static org.apache.iceberg.SnapshotSummary.TOTAL_RECORDS_PROP; import static org.junit.jupiter.api.Assertions.*; @@ -45,9 +46,11 @@ import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; @@ -85,7 +88,19 @@ public static TestIcebergTable forStandardSchemaAndPartitioning( tempDir, hadoopConf, DEFAULT_RECORD_KEY_FIELD, - Collections.singletonList(partitionField)); + Collections.singletonList(partitionField), + false); + } + + public static TestIcebergTable forSchemaWithAdditionalColumnsAndPartitioning( + String tableName, String partitionField, Path tempDir, Configuration hadoopConf) { + return new TestIcebergTable( + tableName, + tempDir, + hadoopConf, + DEFAULT_RECORD_KEY_FIELD, + Collections.singletonList(partitionField), + true); } public TestIcebergTable( @@ -93,18 +108,16 @@ public TestIcebergTable( Path tempDir, Configuration hadoopConf, String recordKeyField, - List partitionFields) { + List partitionFields, + boolean includeAdditionalColumns) { this.tableName = tableName; this.basePath = tempDir.toUri().toString(); this.icebergDataHelper = - TestIcebergDataHelper.builder() - .recordKeyField(recordKeyField) - .partitionFieldNames(filterNullFields(partitionFields)) - .build(); + createIcebergDataHelper( + recordKeyField, filterNullFields(partitionFields), includeAdditionalColumns); this.schema = icebergDataHelper.getTableSchema(); - this.hadoopConf = hadoopConf; - PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec(); + PartitionSpec partitionSpec = icebergDataHelper.getPartitionSpec(); hadoopCatalog = new HadoopCatalog(hadoopConf, basePath); // No namespace specified. TableIdentifier tableIdentifier = TableIdentifier.of(tableName); @@ -112,7 +125,22 @@ public TestIcebergTable( icebergTable = hadoopCatalog.createTable(tableIdentifier, schema, partitionSpec); } else { icebergTable = hadoopCatalog.loadTable(tableIdentifier); + if (!icebergTable.schema().sameSchema(schema)) { + updateTableSchema(schema); + } } + this.hadoopConf = hadoopConf; + } + + private void updateTableSchema(Schema schema) { + Schema currentSchema = icebergTable.schema(); + UpdateSchema updateSchema = icebergTable.updateSchema(); + for (Types.NestedField field : schema.columns()) { + if (currentSchema.findField(field.name()) == null) { + updateSchema.addColumn(field.name(), field.type()); + } + } + updateSchema.commit(); } @Override @@ -208,7 +236,22 @@ public void deleteSpecialPartition() { @Override public String getBasePath() { - return basePath + "/" + tableName; + return removeSlash(basePath) + "/" + tableName; + } + + public String getDataPath() { + return getBasePath() + "/data"; + } + + private String removeSlash(String path) { + if (path.endsWith("/")) { + return path.substring(0, path.length() - 1); + } + return path; + } + + public String getTableName() { + return tableName; } @Override @@ -240,8 +283,12 @@ public void reload() { @Override public List getColumnsToSelect() { + // There is representation difference in hudi and iceberg for local timestamp micros field. + // and hence excluding it from the list of columns to select. + // TODO(HUDI-7088): Remove filter after bug is fixed. return icebergDataHelper.getTableSchema().columns().stream() .map(Types.NestedField::name) + .filter(name -> !name.equals("timestamp_local_micros_nullable_field")) .collect(Collectors.toList()); } diff --git a/core/src/test/java/io/onetable/TestSparkDeltaTable.java b/core/src/test/java/io/onetable/TestSparkDeltaTable.java index 7b3caf338..6415fcd28 100644 --- a/core/src/test/java/io/onetable/TestSparkDeltaTable.java +++ b/core/src/test/java/io/onetable/TestSparkDeltaTable.java @@ -224,6 +224,11 @@ public Map> getRowsByPartition(List rows) { row -> row.getTimestamp(4).toInstant().atZone(ZoneId.of("UTC")).getYear())); } + @Override + public String getBasePath() { + return basePath; + } + @Override public void close() { // no-op as spark session lifecycle is managed by the caller diff --git a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java index ea8260959..f2ad0c61e 100644 --- a/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java +++ b/core/src/test/java/io/onetable/iceberg/TestIcebergDataHelper.java @@ -35,9 +35,9 @@ import java.util.Map; import java.util.Random; import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import lombok.Builder; import lombok.Value; @@ -56,8 +56,8 @@ @Value public class TestIcebergDataHelper { private static final Random RANDOM = new Random(); - private static final Schema DEFAULT_TABLE_SCHEMA = - new Schema( + private static final List COMMON_FIELDS = + Arrays.asList( NestedField.optional(1, "id", Types.StringType.get()), NestedField.optional(2, "ts", Types.LongType.get()), NestedField.optional(3, "level", Types.StringType.get()), @@ -100,13 +100,37 @@ public class TestIcebergDataHelper { 28, "timestamp_micros_nullable_field", Types.TimestampType.withZone()), NestedField.optional( 30, "timestamp_local_micros_nullable_field", Types.TimestampType.withoutZone())); + private static final List ADDITIONAL_FIELDS = + Arrays.asList( + NestedField.optional(31, "additional_column1", Types.StringType.get()), + NestedField.optional(32, "additional_column2", Types.LongType.get())); + private static final Schema BASE_SCHEMA = new Schema(COMMON_FIELDS); + private static final Schema SCHEMA_WITH_ADDITIONAL_COLUMNS = + new Schema( + Stream.concat(COMMON_FIELDS.stream(), ADDITIONAL_FIELDS.stream()) + .collect(Collectors.toList())); + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - @Builder.Default Schema tableSchema = DEFAULT_TABLE_SCHEMA; + Schema tableSchema; String recordKeyField; List partitionFieldNames; + public static TestIcebergDataHelper createIcebergDataHelper( + String recordKeyField, List partitionFields, boolean includeAdditionalColumns) { + Schema tableSchema = getSchema(includeAdditionalColumns); + return TestIcebergDataHelper.builder() + .tableSchema(tableSchema) + .recordKeyField(recordKeyField) + .partitionFieldNames(partitionFields) + .build(); + } + + private static Schema getSchema(boolean includeAdditionalColumns) { + return includeAdditionalColumns ? SCHEMA_WITH_ADDITIONAL_COLUMNS : BASE_SCHEMA; + } + public List generateInsertRecords(int numRecords) { List startTimeWindows = getStartTimeWindows(); List endTimeWindows = getEndTimeWindows(); @@ -255,22 +279,18 @@ private Object generateRandomValueForType( return LocalDate.ofEpochDay(randomDay); case TIME: long totalMicrosInDay = ChronoUnit.DAYS.getDuration().toMillis() * 1000; - long randomTimeInMicros = ThreadLocalRandom.current().nextLong(totalMicrosInDay); - return randomTimeInMicros; + return (long) (RANDOM.nextDouble() * (totalMicrosInDay)); case DECIMAL: Types.DecimalType decimalType = (Types.DecimalType) fieldType; - BigDecimal randomDecimal = - new BigDecimal(RANDOM.nextDouble() * Math.pow(10, decimalType.scale())) - .setScale(decimalType.scale(), RoundingMode.HALF_UP); - return randomDecimal; + return new BigDecimal(RANDOM.nextDouble() * Math.pow(10, decimalType.scale())) + .setScale(decimalType.scale(), RoundingMode.HALF_UP); case TIMESTAMP: Types.TimestampType timestampType = (Types.TimestampType) fieldType; long lowerBoundMillis = timeLowerBound.toEpochMilli(); long upperBoundMillis = timeUpperBound.toEpochMilli(); long randomMillisInRange = - lowerBoundMillis - + ThreadLocalRandom.current().nextLong(upperBoundMillis - lowerBoundMillis); + lowerBoundMillis + (long) (RANDOM.nextDouble() * (upperBoundMillis - lowerBoundMillis)); if (timestampType.shouldAdjustToUTC()) { return EPOCH.plus(randomMillisInRange, ChronoUnit.MILLIS); } else {