Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.2] Fix the consistency issue of listFrom API #4081

Open
wants to merge 1 commit into
base: branch-3.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,9 @@ public void write(
resolvedPath.getName(),
tempPath,
false, // not complete
null // no expireTime
null, // no expireTime
fs.getDefaultBlockSize(),
System.currentTimeMillis()
);

// Step 2.1: Create temp file T(N)
Expand Down Expand Up @@ -455,7 +457,7 @@ private void copyFile(FileSystem fs, Path src, Path dst) throws IOException {
/**
* Returns path stripped user info.
*/
private Path stripUserInfo(Path path) {
protected Path stripUserInfo(Path path) {
final URI uri = path.toUri();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,31 @@ public final class ExternalCommitEntry {
*/
public final Long expireTime;

/*
* File size
*/
public final Long fileSize;

/*
* file modification time
*/
public final Long modificationTime;

public ExternalCommitEntry(
Path tablePath,
String fileName,
String tempPath,
boolean complete,
Long expireTime) {
Long expireTime,
Long fileSize,
Long modificationTime) {
this.tablePath = tablePath;
this.fileName = fileName;
this.tempPath = tempPath;
this.complete = complete;
this.expireTime = expireTime;
this.fileSize = fileSize;
this.modificationTime = modificationTime;
}

/**
Expand All @@ -73,7 +87,9 @@ public ExternalCommitEntry asComplete(long expirationDelaySeconds) {
this.fileName,
this.tempPath,
true,
System.currentTimeMillis() / 1000L + expirationDelaySeconds
System.currentTimeMillis() / 1000L + expirationDelaySeconds,
this.fileSize,
this.modificationTime
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* Copyright (2021) The Delta Lake Project Authors.
*
Expand All @@ -15,21 +16,31 @@
*/

package io.delta.storage;

import io.delta.storage.internal.S3LogStoreUtil;
import io.delta.storage.utils.ReflectionUtils;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;

import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.conf.Configuration;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
Expand Down Expand Up @@ -87,7 +98,6 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
public static final String DDB_CREATE_TABLE_RCU = "provisionedThroughput.rcu";
public static final String DDB_CREATE_TABLE_WCU = "provisionedThroughput.wcu";

// WARNING: setting this value too low can cause data loss. Defaults to a duration of 1 day.
public static final String TTL_SECONDS = "ddb.ttl";

Expand All @@ -100,6 +110,8 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
private static final String ATTR_COMPLETE = "complete";
private static final String ATTR_EXPIRE_TIME = "expireTime";

private static final String ATTR_FILE_SIZE = "fileSize";
private static final String ATTR_MODI_TIME = "modificationTime";
/**
* Member fields
*/
Expand Down Expand Up @@ -139,6 +151,84 @@ public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
tryEnsureTableExists(hadoopConf);
}

private Iterator<FileStatus> mergeFileLists(
List<FileStatus> list,
List<FileStatus> listWithPrecedence) {
final Map<Path, FileStatus> fileStatusMap = new HashMap<>();

// insert all elements from `listWithPrecedence` (highest priority)
// and then insert elements from `list` if and only if that key doesn't already exist
Stream.concat(listWithPrecedence.stream(), list.stream())
.forEach(fs -> fileStatusMap.putIfAbsent(fs.getPath(), fs));

return fileStatusMap
.values()
.stream()
.sorted(Comparator.comparing(a -> a.getPath().getName()))
.iterator();
}

/**
* List files starting from `resolvedPath` (inclusive) in the same directory.
*/
private List<FileStatus> listFromCache(
FileSystem fs,
Path resolvedPath) {
final Path pathKey = super.stripUserInfo(resolvedPath);
final Path tablePath = getTablePath(resolvedPath);

final Map<String, Condition> conditions = new ConcurrentHashMap<>();
conditions.put(
ATTR_TABLE_PATH,
new Condition()
.withComparisonOperator(ComparisonOperator.EQ)
.withAttributeValueList(new AttributeValue(tablePath.toString()))
);
conditions.put(ATTR_FILE_NAME, new Condition().withComparisonOperator(ComparisonOperator.GE)
.withAttributeValueList(new AttributeValue(pathKey.getName())));

final List<Map<String,AttributeValue>> items = client.query(
new QueryRequest(tableName)
.withConsistentRead(true)
.withScanIndexForward(false)
.withLimit(1)
.withKeyConditions(conditions)
).getItems();

List<FileStatus> statuses = new ArrayList<FileStatus>();
items.forEach(item -> {
ExternalCommitEntry entry = dbResultToCommitEntry(item);
if (entry.complete == true) {
long fileSize = entry.fileSize != null ? entry.fileSize : 0L;
long modificationTime = entry.modificationTime != null ? entry.modificationTime : System.currentTimeMillis();
statuses.add(
new FileStatus(
entry.fileSize,
false,
1,
fileSize,
modificationTime,
entry.absoluteFilePath()
));
}
});
return statuses;
}

@Override
public Iterator<FileStatus> listFrom(Path path, Configuration hadoopConf) throws IOException {
final FileSystem fs = path.getFileSystem(hadoopConf);
final Path resolvedPath = stripUserInfo(fs.makeQualified(path));

final List<FileStatus> listedFromFs = Lists.newArrayList(super.listFrom(path, hadoopConf));

// add this to list the completed entry from external cache
// on the occasion that the filesystem could not provide strong consistency
final List<FileStatus> listedFromCache = listFromCache(fs, resolvedPath);

return mergeFileLists(listedFromCache, listedFromFs);
}

@Override
public CloseableIterator<String> read(Path path, Configuration hadoopConf) throws IOException {
// With many concurrent readers/writers, there's a chance that concurrent 'recovery'
Expand Down Expand Up @@ -222,11 +312,16 @@ protected Optional<ExternalCommitEntry> getLatestExternalEntry(Path tablePath) {
*/
private ExternalCommitEntry dbResultToCommitEntry(Map<String, AttributeValue> item) {
final AttributeValue expireTimeAttr = item.get(ATTR_EXPIRE_TIME);
Long fileSize = item.get(ATTR_FILE_SIZE) != null ? Long.parseLong(item.get(ATTR_FILE_SIZE).getN()) : 0;
Long modiTime = item.get(ATTR_MODI_TIME) != null ? Long.parseLong(item.get(ATTR_MODI_TIME).getN()) : System.currentTimeMillis() / 1000;

return new ExternalCommitEntry(
new Path(item.get(ATTR_TABLE_PATH).getS()),
item.get(ATTR_FILE_NAME).getS(),
item.get(ATTR_TEMP_PATH).getS(),
item.get(ATTR_COMPLETE).getS().equals("true"),
fileSize,
modiTime,
expireTimeAttr != null ? Long.parseLong(expireTimeAttr.getN()) : null
);
}
Expand All @@ -236,6 +331,11 @@ private PutItemRequest createPutItemRequest(ExternalCommitEntry entry, boolean o
attributes.put(ATTR_TABLE_PATH, new AttributeValue(entry.tablePath.toString()));
attributes.put(ATTR_FILE_NAME, new AttributeValue(entry.fileName));
attributes.put(ATTR_TEMP_PATH, new AttributeValue(entry.tempPath));
attributes.put(ATTR_FILE_SIZE, new AttributeValue().withN(
String.valueOf(entry.fileSize != null ? entry.fileSize : 0L)));
attributes.put(ATTR_MODI_TIME, new AttributeValue().withN(
String.valueOf(entry.modificationTime != null ? entry.modificationTime : System.currentTimeMillis())
));
attributes.put(
ATTR_COMPLETE,
new AttributeValue().withS(Boolean.toString(entry.complete))
Expand Down
Loading