Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* @since 0.1
*/
@Value
@Builder(toBuilder = true)
@Builder(toBuilder = true, builderClassName = "Builder")
public class TableChange {
// Change in files at the specified instant
InternalFilesDiff filesDiff;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.model.storage;

import java.util.Iterator;
import java.util.function.Supplier;

import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.ToString;
import lombok.experimental.Accessors;
import lombok.experimental.FieldDefaults;
import lombok.experimental.SuperBuilder;

@Accessors(fluent = true)
@SuperBuilder(toBuilder = true)
@FieldDefaults(makeFinal = true, level = lombok.AccessLevel.PRIVATE)
@Getter
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class InternalDeletionVector extends InternalFile {
// path (absolute with scheme) of data file to which this deletion vector belongs
@NonNull String dataFilePath;

// super.getFileSizeBytes() is the size of the deletion vector file
// super.getPhysicalPath() is the absolute path (with scheme) of the deletion vector file
// super.getRecordCount() is the count of records in the deletion vector file

// offset of deletion vector start in a deletion vector file
int offset;

/**
* binary representation of the deletion vector. The consumer can use the {@link
* #ordinalsIterator()} to extract the ordinals represented in the binary format.
*/
byte[] binaryRepresentation;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently when this field is set to a non-null value the ordinalsIterator is also set. I think it may be cleaner to remove this and rely directly on the ordinalsIterator. Is there something in the future though where this may be used directly?

My main worry is that future developers implementing support for deletion vectors may eagerly parse the data into this field.


/**
* Supplier for an iterator that returns the ordinals of records deleted by this deletion vector
* in the linked data file, identified by {@link #dataFilePath}.
*
* <p>The {@link InternalDeletionVector} instance does not guarantee that a new or distinct result
* will be returned each time the supplier is invoked. However, the supplier is expected to return
* a new iterator for each call.
*/
@Getter(AccessLevel.NONE)
Supplier<Iterator<Long>> ordinalsSupplier;

/**
* @return An iterator that returns the ordinals of records deleted by this deletion vector in the
* linked data file. There is no guarantee that a new or distinct iterator will be returned
* each time the iterator is invoked.
*/
public Iterator<Long> ordinalsIterator() {
return ordinalsSupplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@

package org.apache.xtable.delta;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor;
import org.apache.spark.sql.delta.actions.RemoveFile;
import org.apache.spark.sql.delta.deletionvectors.RoaringBitmapArray;
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore;
import org.apache.spark.sql.delta.storage.dv.HadoopFileSystemDVStore;

import com.google.common.annotations.VisibleForTesting;

import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.model.schema.InternalField;
Expand All @@ -38,6 +46,7 @@
import org.apache.xtable.model.stat.FileStats;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalDeletionVector;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaActionsConverter {
Expand Down Expand Up @@ -113,16 +122,66 @@ static String getFullPathToFile(Snapshot snapshot, String dataFilePath) {
*
* @param snapshot the commit snapshot
* @param addFile the add file action
* @return the deletion vector representation (path of data file), or null if no deletion vector
* is present
* @return the deletion vector representation, or null if no deletion vector is present
*/
public String extractDeletionVectorFile(Snapshot snapshot, AddFile addFile) {
public InternalDeletionVector extractDeletionVector(Snapshot snapshot, AddFile addFile) {
DeletionVectorDescriptor deletionVector = addFile.deletionVector();
if (deletionVector == null) {
return null;
}

String dataFilePath = addFile.path();
return getFullPathToFile(snapshot, dataFilePath);
dataFilePath = getFullPathToFile(snapshot, dataFilePath);

InternalDeletionVector.InternalDeletionVectorBuilder<?, ?> deleteVectorBuilder =
InternalDeletionVector.builder()
.recordCount(deletionVector.cardinality())
.fileSizeBytes(deletionVector.sizeInBytes())
.dataFilePath(dataFilePath);

if (deletionVector.isInline()) {
deleteVectorBuilder
.binaryRepresentation(deletionVector.inlineData())
.physicalPath("")
.ordinalsSupplier(() -> ordinalsIterator(deletionVector.inlineData()));
} else {
Path deletionVectorFilePath = deletionVector.absolutePath(snapshot.deltaLog().dataPath());
deleteVectorBuilder
.offset(getOffset(deletionVector))
.physicalPath(deletionVectorFilePath.toString())
.ordinalsSupplier(() -> ordinalsIterator(snapshot, deletionVector));
}

return deleteVectorBuilder.build();
}

private Iterator<Long> ordinalsIterator(byte[] bytes) {
RoaringBitmapArray rbm = RoaringBitmapArray.readFrom(bytes);
long[] ordinals = rbm.values();
return Arrays.stream(ordinals).iterator();
}

private Iterator<Long> ordinalsIterator(
Snapshot snapshot, DeletionVectorDescriptor deleteVector) {
Path deletionVectorFilePath = deleteVector.absolutePath(snapshot.deltaLog().dataPath());
int offset = getOffset(deleteVector);
long[] ordinals =
parseOrdinalFile(
snapshot.deltaLog().newDeltaHadoopConf(),
deletionVectorFilePath,
deleteVector.sizeInBytes(),
offset);
return Arrays.stream(ordinals).iterator();
}

private static int getOffset(DeletionVectorDescriptor deleteVector) {
return deleteVector.offset().isDefined() ? (int) deleteVector.offset().get() : 1;
}

@VisibleForTesting
long[] parseOrdinalFile(Configuration conf, Path filePath, int size, int offset) {
DeletionVectorStore dvStore = new HadoopFileSystemDVStore(conf);
RoaringBitmapArray rbm = dvStore.read(filePath, offset, size);
return rbm.values();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import lombok.Builder;
import lombok.extern.log4j.Log4j2;
Expand All @@ -53,6 +53,8 @@
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.FileFormat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalDeletionVector;
import org.apache.xtable.model.storage.InternalFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.spi.extractor.ConversionSource;
Expand Down Expand Up @@ -113,8 +115,8 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// All 3 of the following data structures use data file's absolute path as the key
Map<String, InternalDataFile> addedFiles = new HashMap<>();
Map<String, InternalDataFile> removedFiles = new HashMap<>();
// Set of data file paths for which deletion vectors exists.
Set<String> deletionVectors = new HashSet<>();
// Map of data file paths for which deletion vectors exists.
Map<String, InternalDeletionVector> deletionVectors = new HashMap<>();

for (Action action : actionsForVersion) {
if (action instanceof AddFile) {
Expand All @@ -129,10 +131,10 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
DeltaPartitionExtractor.getInstance(),
DeltaStatsExtractor.getInstance());
addedFiles.put(dataFile.getPhysicalPath(), dataFile);
String deleteVectorPath =
actionsConverter.extractDeletionVectorFile(snapshotAtVersion, (AddFile) action);
if (deleteVectorPath != null) {
deletionVectors.add(deleteVectorPath);
InternalDeletionVector deletionVector =
actionsConverter.extractDeletionVector(snapshotAtVersion, (AddFile) action);
if (deletionVector != null) {
deletionVectors.put(deletionVector.dataFilePath(), deletionVector);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletionVector.dataFilePath() points to the path of the associated Parquet Data File.

We should use deletionVector.getPhysicalPath() instead. Thoughts? @ashvina

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @piyushdubey
The intention is to use path of the data file with which this deletion vector is associated (see comment on line 118). This is used to update the maps of files added and removed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I see on line 168, you are concatenating the deletion vectors to the Internal files, which will not add dv and data file both without skipping.

Thanks for the clarification.

}
} else if (action instanceof RemoveFile) {
InternalDataFile dataFile =
Expand All @@ -151,7 +153,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
// entry which is replaced by a new entry, AddFile with delete vector information. Since the
// same data file is removed and added, we need to remove it from the added and removed file
// maps which are used to track actual added and removed data files.
for (String deletionVector : deletionVectors) {
for (String deletionVector : deletionVectors.keySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: the name deletionVector is no longer representative of the actual string. Something like dataFileForDeletionVector would be more clear

// validate that a Remove action is also added for the data file
if (removedFiles.containsKey(deletionVector)) {
addedFiles.remove(deletionVector);
Expand All @@ -163,11 +165,15 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
}
}

List<InternalFile> allAddedFiles =
Stream.concat(addedFiles.values().stream(), deletionVectors.values().stream())
.collect(Collectors.toList());
InternalFilesDiff internalFilesDiff =
InternalFilesDiff.builder()
.filesAdded(addedFiles.values())
.filesAdded(allAddedFiles)
.filesRemoved(removedFiles.values())
.build();

return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.filesDiff(internalFilesDiff)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.spark.sql.functions;

import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.actions.AddFile;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -212,11 +213,15 @@ private String initBasePath(Path tempDir, String tableName) throws IOException {
}

public List<String> getAllActiveFiles() {
return deltaLog.snapshot().allFiles().collectAsList().stream()
return getAllActiveFilesInfo().stream()
.map(addFile -> addSlashToBasePath(basePath) + addFile.path())
.collect(Collectors.toList());
}

public List<AddFile> getAllActiveFilesInfo() {
return deltaLog.snapshot().allFiles().collectAsList();
}

private String addSlashToBasePath(String basePath) {
if (basePath.endsWith("/")) {
return basePath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFile;

public class ValidationTestHelper {

Expand Down Expand Up @@ -96,7 +97,7 @@ public static List<String> getAllFilePaths(InternalSnapshot internalSnapshot) {
}

private static Set<String> extractPathsFromDataFile(Set<InternalDataFile> dataFiles) {
return dataFiles.stream().map(InternalDataFile::getPhysicalPath).collect(Collectors.toSet());
return dataFiles.stream().map(InternalFile::getPhysicalPath).collect(Collectors.toSet());
}

private static void replaceFileScheme(List<String> filePaths) {
Expand Down
Loading