Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058

Merged
merged 33 commits into from
Oct 23, 2024
Merged
Changes from 5 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
02ae2fc
initial changes for iceberg distcp partition copy
Blazer-007 Sep 12, 2024
981357c
added datetime filter predicate with unit tests
Blazer-007 Sep 16, 2024
7cd9353
changing string.class to object.class
Blazer-007 Sep 17, 2024
82d10d3
updated replace partition to use serialized data files
Blazer-007 Sep 19, 2024
c43d3e1
some code cleanup
Blazer-007 Sep 20, 2024
0cf7638
added unit test
Blazer-007 Sep 20, 2024
63bb9aa
added replace partition unit test
Blazer-007 Sep 20, 2024
6e1cf6b
refactored and added more test
Blazer-007 Sep 21, 2024
065cde3
added javadoc
Blazer-007 Sep 21, 2024
a13220d
removed extra lines
Blazer-007 Sep 21, 2024
e1d812f
some minor changes
Blazer-007 Sep 21, 2024
4364044
added retry and tests for replace partitions commit step
Blazer-007 Sep 22, 2024
66d81a3
minor test changes
Blazer-007 Sep 22, 2024
24b4823
added metadata validator
Blazer-007 Sep 23, 2024
d8356e1
removed validator class for now
Blazer-007 Sep 24, 2024
4dcc88b
addressed comments and removed some classes for now
Blazer-007 Sep 27, 2024
46bd976
fixing checkstyle bugs and disabling newly added tests to find root c…
Blazer-007 Sep 27, 2024
e1e6f57
addressed pr comments and added few extra logs
Blazer-007 Oct 8, 2024
b6163ba
refactored classes
Blazer-007 Oct 17, 2024
6c73a25
removed extra import statements
Blazer-007 Oct 17, 2024
9c35733
enabled the tests
Blazer-007 Oct 17, 2024
cdc863a
fixed iceberg table tests
Blazer-007 Oct 17, 2024
1dbe929
some refactoring
Blazer-007 Oct 22, 2024
383ed91
refactored tests as per review comments
Blazer-007 Oct 22, 2024
942ad8d
throw tablenotfoundexception in place of nosuchtableexception
Blazer-007 Oct 22, 2024
6a4cf78
fixed throwing proper exception
Blazer-007 Oct 23, 2024
2adaa8b
removed unused imports
Blazer-007 Oct 23, 2024
c948854
replcaed runtime exception with ioexception
Blazer-007 Oct 23, 2024
a55ee61
added check to avoid printing same log line
Blazer-007 Oct 23, 2024
1afc37a
fixed import order
Blazer-007 Oct 23, 2024
bb35070
added catch for CheckedExceptionFunction.WrappedIOException wrapper
Blazer-007 Oct 23, 2024
eeb8d25
fixed compile issue
Blazer-007 Oct 23, 2024
675e8bb
removing unwanted logging
Blazer-007 Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan
}

@Override
public IcebergTable openTable(String dbName, String tableName) {
public IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException {
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), getDatasetDescriptorPlatform(),
createTableOperations(tableId),
@@ -72,5 +72,5 @@ protected String getDatasetDescriptorPlatform() {

protected abstract TableOperations createTableOperations(TableIdentifier tableId);

protected abstract Table loadTableInstance(TableIdentifier tableId);
protected abstract Table loadTableInstance(TableIdentifier tableId) throws IcebergTable.TableNotFoundException;
}
Original file line number Diff line number Diff line change
@@ -29,10 +29,10 @@
public interface IcebergCatalog {

/** @return table identified by `dbName` and `tableName` */
IcebergTable openTable(String dbName, String tableName);
IcebergTable openTable(String dbName, String tableName) throws IcebergTable.TableNotFoundException;

/** @return table identified by `tableId` */
default IcebergTable openTable(TableIdentifier tableId) {
default IcebergTable openTable(TableIdentifier tableId) throws IcebergTable.TableNotFoundException {
// CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` -
// but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)`
return openTable(tableId.namespace().toString(), tableId.name());
Original file line number Diff line number Diff line change
@@ -64,7 +64,11 @@ public boolean tableAlreadyExists(IcebergTable icebergTable) {
}

@Override
protected Table loadTableInstance(TableIdentifier tableId) {
return hc.loadTable(tableId);
protected Table loadTableInstance(TableIdentifier tableId) throws IcebergTable.TableNotFoundException {
try {
return hc.loadTable(tableId);
} catch (Exception e) {
throw new IcebergTable.TableNotFoundException(tableId);
}
}
}
Original file line number Diff line number Diff line change
@@ -96,13 +96,10 @@ public boolean isCompleted() {
*/
@Override
public void execute() throws IOException {
// In IcebergRegisterStep::execute we validated if dest table metadata prior to starting the generate copy entities
// is similar to table metadata while committing metadata in IcebergRegisterStep but that check in here will lead
// to failure most of the time here as it is possible that the table metadata has changed (maybe data has been
// written to newer partitions or other cases as well) between the time of generating copy entities
// and committing metadata. Hence, we are not doing that check here.
// Incase data has been written to the partition we are trying to overwrite, the overwrite step will remove the data
// and copy only data that has been collected in the copy entities.
// Unlike IcebergRegisterStep::execute, which validates dest table metadata has not changed between copy entity
// generation and the post-copy commit, do no such validation here, so dest table writes may continue throughout
// our copying. any new data written in the meanwhile to THE SAME partitions we are about to overwrite will be
// clobbered and replaced by the copy entities from our execution.
IcebergTable destTable = createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
List<DataFile> dataFiles = SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
try {
@@ -124,12 +121,12 @@ public void execute() throws IOException {
this.partitionValue
);
} catch (ExecutionException executionException) {
String msg = String.format("Failed to overwrite partitions for destination iceberg table : {%s}", this.destTableIdStr);
String msg = String.format("~%s~ Failed to overwrite partitions", this.destTableIdStr);
log.error(msg, executionException);
throw new RuntimeException(msg, executionException.getCause());
} catch (RetryException retryException) {
String interruptedNote = Thread.currentThread().isInterrupted() ? "... then interrupted" : "";
String msg = String.format("Failed to overwrite partition for destination table : {%s} : (retried %d times) %s ",
String msg = String.format("~%s~ Failure attempting to overwrite partition [num failures: %d] %s",
this.destTableIdStr,
retryException.getNumberOfFailedAttempts(),
interruptedNote);
@@ -155,7 +152,7 @@ private Retryer<Void> createOverwritePartitionsRetryer() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
String msg = String.format("Exception caught while overwriting partitions for destination table : {%s} : [attempt: %d; %s after start]",
String msg = String.format("~%s~ Exception while overwriting partitions [attempt: %d; elapsed: %s]",
destTableIdStr,
attempt.getAttemptNumber(),
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
Original file line number Diff line number Diff line change
@@ -101,12 +101,13 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
List<CopyEntity> copyEntities = Lists.newArrayList();
IcebergTable srcIcebergTable = getSrcIcebergTable();
List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
Map<Path, DataFile> destDataFileBySrcPath = calcDestDataFileBySrcPath(srcDataFiles);
Configuration defaultHadoopConfiguration = new Configuration();

for (Map.Entry<Path, FileStatus> entry : getDestFilePathWithSrcFileStatus(srcDataFiles, destDataFiles, this.sourceFs).entrySet()) {
for (Map.Entry<Path, FileStatus> entry : calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
Path destPath = entry.getKey();
FileStatus srcFileStatus = entry.getValue();
// TODO: should be the same FS each time; try creating once, reusing thereafter, to not recreate wastefully
FileSystem actualSourceFs = getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);

CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
@@ -121,6 +122,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
}

// Adding this check to avoid adding post publish step when there are no files to copy.
List<DataFile> destDataFiles = new ArrayList<>(destDataFileBySrcPath.values());
if (CollectionUtils.isNotEmpty(destDataFiles)) {
copyEntities.add(createOverwritePostPublishStep(destDataFiles));
}
@@ -129,45 +131,47 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
return copyEntities;
}

private List<DataFile> getDestDataFiles(List<DataFile> srcDataFiles) throws IcebergTable.TableNotFoundException {
private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> srcDataFiles)
throws IcebergTable.TableNotFoundException {
String fileSet = this.getFileSetId();
List<DataFile> destDataFiles = new ArrayList<>();
Map<Path, DataFile> destDataFileBySrcPath = Maps.newHashMap();
if (srcDataFiles.isEmpty()) {
log.warn("~{}~ found no data files for partition col : {} with partition value : {} to copy", fileSet,
this.partitionColumnName, this.partitionColValue);
return destDataFiles;
return destDataFileBySrcPath;
}
TableMetadata srcTableMetadata = getSrcIcebergTable().accessTableMetadata();
TableMetadata destTableMetadata = getDestIcebergTable().accessTableMetadata();
PartitionSpec partitionSpec = destTableMetadata.spec();
String srcWriteDataLocation = srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
String destWriteDataLocation = destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "");
// tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and
// doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string.
String srcWriteDataLocation = Optional.ofNullable(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
"")).orElse("");
String destWriteDataLocation = Optional.ofNullable(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
"")).orElse("");
if (StringUtils.isEmpty(srcWriteDataLocation) || StringUtils.isEmpty(destWriteDataLocation)) {
log.warn(
"Either source or destination table does not have write data location : source table write data location : {} , destination table write data location : {}",
"~{}~ Either source or destination table does not have write data location : source table write data location : {} , destination table write data location : {}",
fileSet,
srcWriteDataLocation,
destWriteDataLocation
);
}
// tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns null if the property is not set and
// doesn't respect passed default value, so to avoid NPE in .replace() we are setting it to empty string.
String prefixToBeReplaced = (srcWriteDataLocation != null) ? srcWriteDataLocation : "";
String prefixToReplaceWith = (destWriteDataLocation != null) ? destWriteDataLocation : "";
GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker();
srcDataFiles.forEach(dataFile -> {
String srcFilePath = dataFile.path().toString();
Path updatedDestFilePath = relocateDestPath(srcFilePath, prefixToBeReplaced, prefixToReplaceWith);
destDataFiles.add(DataFiles.builder(partitionSpec)
Path updatedDestFilePath = relocateDestPath(srcFilePath, srcWriteDataLocation, destWriteDataLocation);
log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, srcFilePath, updatedDestFilePath);
destDataFileBySrcPath.put(new Path(srcFilePath), DataFiles.builder(partitionSpec)
.copy(dataFile)
.withPath(updatedDestFilePath.toString())
.build());
log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, srcFilePath, updatedDestFilePath);
if (growthMilestoneTracker.isAnotherMilestone(destDataFiles.size())) {
log.info("~{}~ created {} destination data files", fileSet, destDataFiles.size());
if (growthMilestoneTracker.isAnotherMilestone(destDataFileBySrcPath.size())) {
log.info("~{}~ created {} destination data files", fileSet, destDataFileBySrcPath.size());
}
});
log.info("~{}~ created {} destination data files", fileSet, destDataFiles.size());
return destDataFiles;
log.info("~{}~ created {} destination data files", fileSet, destDataFileBySrcPath.size());
return destDataFileBySrcPath;
}

private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, String prefixToReplaceWith) {
@@ -183,16 +187,20 @@ private Path addUUIDToPath(String filePathStr) {
return new Path(fileDir, newFileName);
}

private Map<Path, FileStatus> getDestFilePathWithSrcFileStatus(List<DataFile> srcDataFiles,
List<DataFile> destDataFiles, FileSystem fs) throws IOException {
Map<Path, FileStatus> results = Maps.newHashMap();
for (int i = 0; i < srcDataFiles.size(); i++) {
Path srcPath = new Path(srcDataFiles.get(i).path().toString());
Path destPath = new Path(destDataFiles.get(i).path().toString());
FileStatus srcFileStatus = fs.getFileStatus(srcPath);
results.put(destPath, srcFileStatus);
}
return results;
private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, DataFile> destDataFileBySrcPath) {
Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
destDataFileBySrcPath.forEach((srcPath, destDataFile) -> {
FileStatus srcFileStatus;
try {
srcFileStatus = this.sourceFs.getFileStatus(srcPath);
} catch (IOException e) {
String errMsg = String.format("~%s~ Failed to get file status for path : %s", this.getFileSetId(), srcPath);
log.error(errMsg);
throw new RuntimeException(errMsg, e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really wish java.util.function.* played along better w/ checked exceptions... but that's clearly not the case... *sigh*

throwing IOException is actually a key part of the FileSet "contract", so substituting an unchecked RuntimeException (that no caller expects and would NOT be looking out for) is not something we ought to do at this late stage.

instead, either write this iteratively (using for-each loop) or follow IcebergDataset's use of CheckedExceptionFunction.wrapToTunneled

try {
  ...
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
  wrapper.rethrowWrapped();
}

the code there actually uses:

copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()

for caching, which shouldn't be necessary here, given IcebergTable::getPartitionSpecificDataFiles examines only a single snapshot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

srcFileStatusByDestFilePath.put(new Path(destDataFile.path().toString()), srcFileStatus);
});
return srcFileStatusByDestFilePath;
}

private PostPublishStep createOverwritePostPublishStep(List<DataFile> destDataFiles) {
Original file line number Diff line number Diff line change
@@ -287,6 +287,9 @@ protected void overwritePartition(List<DataFile> dataFiles, String partitionColN
dataFiles.forEach(overwriteFiles::addFile);
overwriteFiles.commit();
this.tableOps.refresh();
// Note : this would only arise in a high-frequency commit scenario, but there's no guarantee that the current
// snapshot is necessarily the one from the commit just before. another writer could have just raced to commit
// in between.
log.info("~{}~ SnapshotId after overwrite: {}", tableId, accessTableMetadata().currentSnapshot().snapshotId());
}

Original file line number Diff line number Diff line change
@@ -23,9 +23,9 @@
import org.apache.iceberg.StructLike;

/**
* Predicate implementation for filtering Iceberg partitions based on specified partition values.
* Predicate implementation for filtering Iceberg partitions based on specified partition value.
* <p>
* This class filters partitions by checking if the partition value matches any of the specified values.
* This class filters partitions by checking if the partition value matches the specified partition value.
* </p>
*/
public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements Predicate<StructLike> {
@@ -36,18 +36,18 @@ public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements Predic
* Constructs an {@code IcebergMatchesAnyPropNamePartitionFilterPredicate} with the specified parameters.
*
* @param partitionColumnIndex the index of the partition column in partition spec
* @param partitionValue the partition value to match
* @param partitionValue the partition value used to match
*/
public IcebergMatchesAnyPropNamePartitionFilterPredicate(int partitionColumnIndex, String partitionValue) {
this.partitionColumnIndex = partitionColumnIndex;
this.partitionValue = partitionValue;
}

/**
* Check if the partition value matches any of the specified partition values.
* Check if the partition value matches the specified partition value.
*
* @param partition the partition to check
* @return {@code true} if the partition value matches any of the specified values, otherwise {@code false}
* @return {@code true} if the partition value matches the specified partition value, otherwise {@code false}
*/
@Override
public boolean test(StructLike partition) {
Loading