diff --git a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
index b425fd018..287b8f38a 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/TableChange.java
@@ -30,7 +30,7 @@
* @since 0.1
*/
@Value
-@Builder(toBuilder = true)
+@Builder(toBuilder = true, builderClassName = "Builder")
public class TableChange {
// Change in files at the specified instant
InternalFilesDiff filesDiff;
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java
new file mode 100644
index 000000000..f9480366c
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDeletionVector.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.storage;
+
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import lombok.experimental.FieldDefaults;
+import lombok.experimental.SuperBuilder;
+
+@Accessors(fluent = true)
+@SuperBuilder(toBuilder = true)
+@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
+@Getter
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+public class InternalDeletionVector extends InternalFile {
+ // path (absolute with scheme) of data file to which this deletion vector belongs
+ @NonNull String dataFilePath;
+
+ // super.getFileSizeBytes() is the size of the deletion vector file
+ // super.getPhysicalPath() is the absolute path (with scheme) of the deletion vector file
+ // super.getRecordCount() is the count of records in the deletion vector file
+
+ // offset of deletion vector start in a deletion vector file
+ int offset;
+
+ /**
+ * binary representation of the deletion vector. The consumer can use the {@link
+ * #ordinalsIterator()} to extract the ordinals represented in the binary format.
+ */
+ byte[] binaryRepresentation;
+
+ /**
+ * Supplier for an iterator that returns the ordinals of records deleted by this deletion vector
+ * in the linked data file, identified by {@link #dataFilePath}.
+ *
+ *
The {@link InternalDeletionVector} instance does not guarantee that a new or distinct result
+ * will be returned each time the supplier is invoked. However, the supplier is expected to return
+ * a new iterator for each call.
+ */
+ @Getter(AccessLevel.NONE)
+ Supplier> ordinalsSupplier;
+
+ /**
+ * @return An iterator that returns the ordinals of records deleted by this deletion vector in the
+ * linked data file. There is no guarantee that a new or distinct iterator will be returned
+ * each time the iterator is invoked.
+ */
+ public Iterator ordinalsIterator() {
+ return ordinalsSupplier.get();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
index 16a320f12..9e3b33cfd 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaActionsConverter.java
@@ -18,18 +18,26 @@
package org.apache.xtable.delta;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
+import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore;
+import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore;
+
+import com.google.common.annotations.VisibleForTesting;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.schema.InternalField;
@@ -38,6 +46,7 @@
import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaActionsConverter {
@@ -113,16 +122,66 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
*
* @param snapshot the commit snapshot
* @param addFile the add file action
- * @return the deletion vector representation (path of data file), or null if no deletion vector
- * is present
+ * @return the deletion vector representation, or null if no deletion vector is present
*/
- public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
+ public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}
String dataFilePath = addFile.path();
- return getFullPathToFile(snapshot, dataFilePath);
+ dataFilePath = getFullPathToFile(snapshot, dataFilePath);
+
+ InternalDeletionVector.InternalDeletionVectorBuilder, ?> deleteVectorBuilder =
+ InternalDeletionVector.builder()
+ .recordCount(deletionVector.cardinality())
+ .fileSizeBytes(deletionVector.sizeInBytes())
+ .dataFilePath(dataFilePath);
+
+ if (deletionVector.isInline()) {
+ deleteVectorBuilder
+ .binaryRepresentation(deletionVector.inlineData())
+ .physicalPath("")
+ .ordinalsSupplier(() -> ordinalsIterator(deletionVector.inlineData()));
+ } else {
+ Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath());
+ deleteVectorBuilder
+ .offset(getOffset(deletionVector))
+ .physicalPath(deletionVectorFilePath.toString())
+ .ordinalsSupplier(() -> ordinalsIterator(snapshot, deletionVector));
+ }
+
+ return deleteVectorBuilder.build();
+ }
+
+ private Iterator ordinalsIterator(byte[] bytes) {
+ RoaringBitmapArray rbm = RoaringBitmapArray.readFrom(bytes);
+ long[] ordinals = rbm.values();
+ return Arrays.stream(ordinals).iterator();
+ }
+
+ private Iterator ordinalsIterator(
+ Snapshot snapshot, DeletionVectorDescriptor deleteVector) {
+ Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath());
+ int offset = getOffset(deleteVector);
+ long[] ordinals =
+ parseOrdinalFile(
+ snapshot.deltaLog().newDeltaHadoopConf(),
+ deletionVectorFilePath,
+ deleteVector.sizeInBytes(),
+ offset);
+ return Arrays.stream(ordinals).iterator();
+ }
+
+ private static int getOffset(DeletionVectorDescriptor deleteVector) {
+ return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1;
+ }
+
+ @VisibleForTesting
+ long[] parseOrdinalFile(Configuration conf, Path filePath, int size, int offset) {
+ DeletionVectorStore dvStore = new HadoopFileSystemDVStore(conf);
+ RoaringBitmapArray rbm = dvStore.read(filePath, offset, size);
+ return rbm.values();
}
}
diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
index 97804d5f2..3524fb8fd 100644
--- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
+++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java
@@ -22,11 +22,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import lombok.Builder;
import lombok.extern.log4j.Log4j2;
@@ -53,6 +53,8 @@
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalDeletionVector;
+import org.apache.xtable.model.storage.InternalFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.spi.extractor.ConversionSource;
@@ -113,8 +115,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// All 3 of the following data structures use data file's absolute path as the key
Map addedFiles = new HashMap<>();
Map removedFiles = new HashMap<>();
- // Set of data file paths for which deletion vectors exists.
- Set deletionVectors = new HashSet<>();
+ // Map of data file paths for which deletion vectors exists.
+ Map deletionVectors = new HashMap<>();
for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
@@ -129,10 +131,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
- String deleteVectorPath =
- actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
- if (deleteVectorPath != null) {
- deletionVectors.add(deleteVectorPath);
+ InternalDeletionVector deletionVector =
+ actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action);
+ if (deletionVector != null) {
+ deletionVectors.put(deletionVector.dataFilePath(), deletionVector);
}
} else if (action instanceof RemoveFile) {
InternalDataFile dataFile =
@@ -151,7 +153,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
- for (String deletionVector : deletionVectors) {
+ for (String deletionVector : deletionVectors.keySet()) {
// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
@@ -163,11 +165,15 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
}
}
+ List allAddedFiles =
+ Stream.concat(addedFiles.values().stream(), deletionVectors.values().stream())
+ .collect(Collectors.toList());
InternalFilesDiff internalFilesDiff =
InternalFilesDiff.builder()
- .filesAdded(addedFiles.values())
+ .filesAdded(allAddedFiles)
.filesRemoved(removedFiles.values())
.build();
+
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.filesDiff(internalFilesDiff)
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
index ee5b1ccdd..909b1b790 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
@@ -39,6 +39,7 @@
import org.apache.spark.sql.functions;
import org.apache.spark.sql.delta.DeltaLog;
+import org.apache.spark.sql.delta.actions.AddFile;
import com.google.common.base.Preconditions;
@@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException {
}
public List getAllActiveFiles() {
- return deltaLog.snapshot().allFiles().collectAsList().stream()
+ return getAllActiveFilesInfo().stream()
.map(addFile -> addSlashToBasePath(basePath) + addFile.path())
.collect(Collectors.toList());
}
+ public List getAllActiveFilesInfo() {
+ return deltaLog.snapshot().allFiles().collectAsList();
+ }
+
private String addSlashToBasePath(String basePath) {
if (basePath.endsWith("/")) {
return basePath;
diff --git a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
index 9e95f2795..2c330edcb 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ValidationTestHelper.java
@@ -30,6 +30,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
public class ValidationTestHelper {
@@ -96,7 +97,7 @@ public static List getAllFilePaths(InternalSnapshot internalSnapshot) {
}
private static Set extractPathsFromDataFile(Set dataFiles) {
- return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
+ return dataFiles.stream().map(InternalFile::getPhysicalPath).collect(Collectors.toSet());
}
private static void replaceFileScheme(List filePaths) {
diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
index ed02893e3..dcfb5f80d 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaDeleteVectorConvert.java
@@ -19,11 +19,16 @@
package org.apache.xtable.delta;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +43,7 @@
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.actions.AddFile;
+import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import scala.Option;
@@ -49,6 +55,7 @@
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.storage.InternalDeletionVector;
import org.apache.xtable.model.storage.TableFormat;
public class ITDeltaDeleteVectorConvert {
@@ -56,6 +63,7 @@ public class ITDeltaDeleteVectorConvert {
private static SparkSession sparkSession;
private DeltaConversionSourceProvider conversionSourceProvider;
+ private TestSparkDeltaTable testSparkDeltaTable;
@BeforeAll
public static void setupOnce() {
@@ -91,11 +99,24 @@ void setUp() {
conversionSourceProvider.init(hadoopConf);
}
+ private static class TableState {
+ Map activeFiles;
+ List rowsToDelete;
+
+ TableState(Map activeFiles) {
+ this(activeFiles, Collections.emptyList());
+ }
+
+ TableState(Map activeFiles, List rowsToDelete) {
+ this.activeFiles = activeFiles;
+ this.rowsToDelete = rowsToDelete;
+ }
+ }
+
@Test
public void testInsertsUpsertsAndDeletes() {
String tableName = GenericTable.getTableName();
- TestSparkDeltaTable testSparkDeltaTable =
- new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
+ testSparkDeltaTable = new TestSparkDeltaTable(tableName, tempDir, sparkSession, null, false);
// enable deletion vectors for the test table
testSparkDeltaTable
@@ -105,25 +126,30 @@ public void testInsertsUpsertsAndDeletes() {
+ tableName
+ " SET TBLPROPERTIES ('delta.enableDeletionVectors' = true)");
- List> allActiveFiles = new ArrayList<>();
+ List testTableStates = new ArrayList<>();
List allTableChanges = new ArrayList<>();
List rows = testSparkDeltaTable.insertRows(50);
Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp();
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ Map tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles, Collections.emptyList()));
List rows1 = testSparkDeltaTable.insertRows(50);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
assertEquals(100L, testSparkDeltaTable.getNumRows());
- validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0);
// upsert does not create delete vectors
testSparkDeltaTable.upsertRows(rows.subList(0, 20));
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
assertEquals(100L, testSparkDeltaTable.getNumRows());
- validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 0, 0);
testSparkDeltaTable.insertRows(50);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 0, 0);
assertEquals(150L, testSparkDeltaTable.getNumRows());
// delete a few rows with gaps in ids
@@ -133,12 +159,15 @@ public void testInsertsUpsertsAndDeletes() {
.collect(Collectors.toList());
rowsToDelete.addAll(rows.subList(35, 45));
testSparkDeltaTable.deleteRows(rowsToDelete);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles, rowsToDelete));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15);
assertEquals(135L, testSparkDeltaTable.getNumRows());
- validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 15);
testSparkDeltaTable.insertRows(50);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 15);
assertEquals(185L, testSparkDeltaTable.getNumRows());
// delete a few rows from a file which already has a deletion vector, this should generate a
@@ -146,18 +175,22 @@ public void testInsertsUpsertsAndDeletes() {
// This deletion step intentionally deletes the same rows again to test the merge.
rowsToDelete = rows1.subList(5, 15);
testSparkDeltaTable.deleteRows(rowsToDelete);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles, rowsToDelete));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22);
assertEquals(178L, testSparkDeltaTable.getNumRows());
- validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), allActiveFiles.size() + 1, 2, 22);
testSparkDeltaTable.insertRows(50);
- allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles());
+ tableFiles = collectActiveFilesAfterCommit(testSparkDeltaTable);
+ testTableStates.add(new TableState(tableFiles));
+ validateDeletedRecordCount(testSparkDeltaTable.getDeltaLog(), 2, 22);
assertEquals(228L, testSparkDeltaTable.getNumRows());
+ String tableBasePath = testSparkDeltaTable.getBasePath();
SourceTable tableConfig =
SourceTable.builder()
.name(testSparkDeltaTable.getTableName())
- .basePath(testSparkDeltaTable.getBasePath())
+ .basePath(tableBasePath)
.formatName(TableFormat.DELTA)
.build();
DeltaConversionSource conversionSource =
@@ -165,8 +198,9 @@ public void testInsertsUpsertsAndDeletes() {
InternalSnapshot internalSnapshot = conversionSource.getCurrentSnapshot();
// validateDeltaPartitioning(internalSnapshot);
- ValidationTestHelper.validateSnapshot(
- internalSnapshot, allActiveFiles.get(allActiveFiles.size() - 1));
+ List activeDataFilePaths =
+ new ArrayList<>(testTableStates.get(testTableStates.size() - 1).activeFiles.keySet());
+ ValidationTestHelper.validateSnapshot(internalSnapshot, activeDataFilePaths);
// Get changes in incremental format.
InstantsForIncrementalSync instantsForIncrementalSync =
@@ -179,13 +213,126 @@ public void testInsertsUpsertsAndDeletes() {
TableChange tableChange = conversionSource.getTableChangeForCommit(version);
allTableChanges.add(tableChange);
}
- ValidationTestHelper.validateTableChanges(allActiveFiles, allTableChanges);
+
+ List> allActiveDataFilePaths =
+ testTableStates.stream()
+ .map(s -> s.activeFiles)
+ .map(Map::keySet)
+ .map(ArrayList::new)
+ .collect(Collectors.toList());
+ ValidationTestHelper.validateTableChanges(allActiveDataFilePaths, allTableChanges);
+
+ validateDeletionInfo(testTableStates, allTableChanges);
+ }
+
+ // collects active files in the current snapshot as a map and adds it to the list
+ private Map collectActiveFilesAfterCommit(
+ TestSparkDeltaTable testSparkDeltaTable) {
+ Map allFiles =
+ testSparkDeltaTable.getAllActiveFilesInfo().stream()
+ .collect(
+ Collectors.toMap(
+ file -> getAddFileAbsolutePath(file, testSparkDeltaTable.getBasePath()),
+ file -> file));
+ return allFiles;
+ }
+
+ private void validateDeletionInfo(
+ List testTableStates, List allTableChanges) {
+ if (allTableChanges.isEmpty() && testTableStates.size() <= 1) {
+ return;
+ }
+
+ assertEquals(
+ allTableChanges.size(),
+ testTableStates.size() - 1,
+ "Number of table changes should be equal to number of commits - 1");
+
+ for (int i = 0; i < allTableChanges.size() - 1; i++) {
+ Map activeFileAfterCommit = testTableStates.get(i + 1).activeFiles;
+ Map activeFileBeforeCommit = testTableStates.get(i).activeFiles;
+
+ Map activeFilesWithUpdatedDeleteInfo =
+ activeFileAfterCommit.entrySet().stream()
+ .filter(e -> e.getValue().deletionVector() != null)
+ .filter(
+ entry -> {
+ if (activeFileBeforeCommit.get(entry.getKey()) == null) {
+ return true;
+ }
+ if (activeFileBeforeCommit.get(entry.getKey()).deletionVector() == null) {
+ return true;
+ }
+ DeletionVectorDescriptor deletionVectorDescriptor =
+ activeFileBeforeCommit.get(entry.getKey()).deletionVector();
+ return !deletionVectorDescriptor.equals(entry.getValue().deletionVector());
+ })
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ if (activeFilesWithUpdatedDeleteInfo.isEmpty()) {
+ continue;
+ }
+
+ // validate all new delete vectors are correctly detected
+ validateDeletionInfoForCommit(
+ testTableStates.get(i + 1), activeFilesWithUpdatedDeleteInfo, allTableChanges.get(i));
+ }
+ }
+
+ private void validateDeletionInfoForCommit(
+ TableState tableState,
+ Map activeFilesAfterCommit,
+ TableChange changeDetectedForCommit) {
+ Map detectedDeleteInfos =
+ changeDetectedForCommit.getFilesDiff().getFilesAdded().stream()
+ .filter(file -> file instanceof InternalDeletionVector)
+ .map(file -> (InternalDeletionVector) file)
+ .collect(Collectors.toMap(InternalDeletionVector::dataFilePath, file -> file));
+
+ Map filesWithDeleteVectors =
+ activeFilesAfterCommit.entrySet().stream()
+ .filter(file -> file.getValue().deletionVector() != null)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ assertEquals(filesWithDeleteVectors.size(), detectedDeleteInfos.size());
+
+ for (Map.Entry fileWithDeleteVector : filesWithDeleteVectors.entrySet()) {
+ InternalDeletionVector deleteInfo = detectedDeleteInfos.get(fileWithDeleteVector.getKey());
+ assertNotNull(deleteInfo);
+ DeletionVectorDescriptor deletionVectorDescriptor =
+ fileWithDeleteVector.getValue().deletionVector();
+ assertEquals(deletionVectorDescriptor.cardinality(), deleteInfo.getRecordCount());
+ assertEquals(deletionVectorDescriptor.sizeInBytes(), deleteInfo.getFileSizeBytes());
+ assertEquals(deletionVectorDescriptor.offset().get(), deleteInfo.offset());
+
+ String deletionFilePath =
+ deletionVectorDescriptor
+ .absolutePath(new org.apache.hadoop.fs.Path(testSparkDeltaTable.getBasePath()))
+ .toString();
+ assertEquals(deletionFilePath, deleteInfo.getPhysicalPath());
+
+ Iterator iterator = deleteInfo.ordinalsIterator();
+ List deletes = new ArrayList<>();
+ iterator.forEachRemaining(deletes::add);
+ assertEquals(deletes.size(), deleteInfo.getRecordCount());
+ }
+ }
+
+ private static String getAddFileAbsolutePath(AddFile file, String tableBasePath) {
+ String filePath = file.path();
+ if (filePath.startsWith(tableBasePath)) {
+ return filePath;
+ }
+ return Paths.get(tableBasePath, file.path()).toString();
}
private void validateDeletedRecordCount(
- DeltaLog deltaLog, int version, int deleteVectorFileCount, int deletionRecordCount) {
+ DeltaLog deltaLog, int deleteVectorFileCount, int deletionRecordCount) {
List allFiles =
- deltaLog.getSnapshotAt(version, Option.empty()).allFiles().collectAsList();
+ deltaLog
+ .getSnapshotAt(deltaLog.snapshot().version(), Option.empty())
+ .allFiles()
+ .collectAsList();
List filesWithDeletionVectors =
allFiles.stream().filter(f -> f.deletionVector() != null).collect(Collectors.toList());
diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
index e62e93414..117b3fc79 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaActionsConverter.java
@@ -18,10 +18,20 @@
package org.apache.xtable.delta;
-import java.net.URISyntaxException;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -29,39 +39,155 @@
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
+import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArrayFormat;
import scala.Option;
+import org.apache.xtable.model.storage.InternalDeletionVector;
+
class TestDeltaActionsConverter {
+ private final String basePath = "https://container.blob.core.windows.net/tablepath/";
+ private final int size = 372;
+ private final long time = 376;
+ private final boolean dataChange = true;
+ private final String stats = "";
+ private final int cardinality = 42;
+ private final int offset = 634;
+
@Test
- void extractDeletionVector() throws URISyntaxException {
+ void extractMissingDeletionVector() {
DeltaActionsConverter actionsConverter = DeltaActionsConverter.getInstance();
- int size = 123;
- long time = 234L;
- boolean dataChange = true;
- String stats = "";
- String filePath = "https://container.blob.core.windows.net/tablepath/file_path";
+ String filePath = basePath + "file_path";
Snapshot snapshot = Mockito.mock(Snapshot.class);
- DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
DeletionVectorDescriptor deletionVector = null;
AddFile addFileAction =
new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);
- Assertions.assertNull(actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));
+ InternalDeletionVector internaldeletionVector =
+ actionsConverter.extractDeletionVector(snapshot, addFileAction);
+ assertNull(internaldeletionVector);
+ }
- deletionVector =
+ @Test
+ void extractDeletionVectorInFileAbsolutePath() {
+ DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance());
+
+ String dataFilePath = "data_file";
+ String deleteFilePath = "https://container.blob.core.windows.net/tablepath/delete_file";
+ Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+ DeletionVectorDescriptor deletionVector =
DeletionVectorDescriptor.onDiskWithAbsolutePath(
- filePath, size, 42, Option.empty(), Option.empty());
+ deleteFilePath, size, cardinality, Option.apply(offset), Option.empty());
- addFileAction =
- new AddFile(filePath, null, size, time, dataChange, stats, null, deletionVector);
+ AddFile addFileAction =
+ new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector);
+
+ Configuration conf = new Configuration();
+ DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+ when(snapshot.deltaLog()).thenReturn(deltaLog);
+ when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+ when(deltaLog.newDeltaHadoopConf()).thenReturn(conf);
+
+ long[] ordinals = {45, 78, 98};
+ Mockito.doReturn(ordinals)
+ .when(actionsConverter)
+ .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset);
+
+ InternalDeletionVector internaldeletionVector =
+ actionsConverter.extractDeletionVector(snapshot, addFileAction);
+ assertNotNull(internaldeletionVector);
+ assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath());
+ assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath());
+ assertEquals(offset, internaldeletionVector.offset());
+ assertEquals(cardinality, internaldeletionVector.getRecordCount());
+ assertEquals(size, internaldeletionVector.getFileSizeBytes());
+ assertNull(internaldeletionVector.binaryRepresentation());
+
+ Iterator iterator = internaldeletionVector.ordinalsIterator();
+ Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ void extractDeletionVectorInFileRelativePath() {
+ DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance());
+
+ String dataFilePath = "data_file";
+ UUID deleteFileId = UUID.randomUUID();
+ String deleteFilePath = basePath + "deletion_vector_" + deleteFileId + ".bin";
+ Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+ DeletionVectorDescriptor deletionVector =
+ DeletionVectorDescriptor.onDiskWithRelativePath(
+ deleteFileId, "", size, cardinality, Option.apply(offset), Option.empty());
+
+ AddFile addFileAction =
+ new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector);
+
+ Configuration conf = new Configuration();
+ DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+ when(snapshot.deltaLog()).thenReturn(deltaLog);
+ when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+ when(deltaLog.newDeltaHadoopConf()).thenReturn(conf);
+
+ long[] ordinals = {45, 78, 98};
+ Mockito.doReturn(ordinals)
+ .when(actionsConverter)
+ .parseOrdinalFile(conf, new Path(deleteFilePath), size, offset);
+
+ InternalDeletionVector internaldeletionVector =
+ actionsConverter.extractDeletionVector(snapshot, addFileAction);
+ assertNotNull(internaldeletionVector);
+ assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath());
+ assertEquals(deleteFilePath, internaldeletionVector.getPhysicalPath());
+ assertEquals(offset, internaldeletionVector.offset());
+ assertEquals(cardinality, internaldeletionVector.getRecordCount());
+ assertEquals(size, internaldeletionVector.getFileSizeBytes());
+ assertNull(internaldeletionVector.binaryRepresentation());
+
+ Iterator iterator = internaldeletionVector.ordinalsIterator();
+ Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ void extractInlineDeletionVector() {
+ DeltaActionsConverter actionsConverter = spy(DeltaActionsConverter.getInstance());
+
+ String dataFilePath = "data_file";
+ Snapshot snapshot = Mockito.mock(Snapshot.class);
+
+ long[] ordinals = {45, 78, 98};
+ RoaringBitmapArray rbm = new RoaringBitmapArray();
+ Arrays.stream(ordinals).forEach(rbm::add);
+ byte[] bytes = rbm.serializeAsByteArray(RoaringBitmapArrayFormat.Portable());
+
+ DeletionVectorDescriptor deletionVector =
+ DeletionVectorDescriptor.inlineInLog(bytes, cardinality);
+
+ AddFile addFileAction =
+ new AddFile(dataFilePath, null, size, time, dataChange, stats, null, deletionVector);
+
+ DeltaLog deltaLog = Mockito.mock(DeltaLog.class);
+ when(snapshot.deltaLog()).thenReturn(deltaLog);
+ when(deltaLog.dataPath()).thenReturn(new Path(basePath));
+
+ InternalDeletionVector internaldeletionVector =
+ actionsConverter.extractDeletionVector(snapshot, addFileAction);
+ assertNotNull(internaldeletionVector);
+ assertEquals(basePath + dataFilePath, internaldeletionVector.dataFilePath());
+ assertArrayEquals(bytes, internaldeletionVector.binaryRepresentation());
+ assertEquals(cardinality, internaldeletionVector.getRecordCount());
+ assertEquals(bytes.length, internaldeletionVector.getFileSizeBytes());
+ assertEquals("", internaldeletionVector.getPhysicalPath());
+ assertEquals(0, internaldeletionVector.offset());
- Mockito.when(snapshot.deltaLog()).thenReturn(deltaLog);
- Mockito.when(deltaLog.dataPath())
- .thenReturn(new Path("https://container.blob.core.windows.net/tablepath"));
- Assertions.assertEquals(
- filePath, actionsConverter.extractDeletionVectorFile(snapshot, addFileAction));
+ Iterator iterator = internaldeletionVector.ordinalsIterator();
+ Arrays.stream(ordinals).forEach(o -> assertEquals(o, iterator.next()));
+ assertFalse(iterator.hasNext());
}
}