Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions docs/src/main/sphinx/connector/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,25 @@ Write operations are supported for tables stored on the following systems:

- S3 and S3-compatible storage

Writes to Amazon S3 and S3-compatible storage must be enabled
with the `delta.enable-non-concurrent-writes` property. Writes to S3 can
safely be made from multiple Trino clusters; however, write collisions are not
detected when writing concurrently from other Delta Lake engines. You must
make sure that no concurrent data modifications are run to avoid data
Writes to Amazon S3 and S3-compatible storage are controlled by following
configuration properties. When
`delta.s3.transaction-log-conditional-writes.enabled` is set to `true`
(default), the connector uses S3 conditional writes to detect log write
collisions. This is compatible with any other engines that also use
conditional writes.

When `delta.s3.transaction-log-conditional-writes.enabled` is false, then
writes to Amazon S3 and S3-compatible storage must be enabled with the
`delta.enable-non-concurrent-writes` property. In this mode, the connector
leverages S3 strong consistency guarantees combined with Trino specific naming
strategy to orchestrate creation of new log files. In this mode, writes to S3
can safely be made from multiple Trino clusters using same writing mode;
however, write collisions are not detected when writing concurrently from other
Delta Lake engines, or from Trino clusters using S3 conditional writes. You
must make sure that no concurrent data modifications are run to avoid data
corruption.


(delta-lake-schema-table-management)=
### Schema and table management

Expand Down
2 changes: 0 additions & 2 deletions docs/src/main/sphinx/object-storage/file-system-s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ support:
- AWS signing protocol to use for authenticating S3 requests. Supported values are:
`AwsS3V4Signer`, `Aws4Signer`, `AsyncAws4Signer`, `Aws4UnsignedPayloadSigner`,
`EventStreamAws4Signer`.
* - `s3.exclusive-create`
- Whether conditional write is supported by the S3-compatible storage. Defaults to `true`.
* - `s3.canned-acl`
- [Canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl)
to use when uploading files to S3. Defaults to `NONE`, which has the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ record S3Context(
S3SseContext s3SseContext,
Optional<AwsCredentialsProvider> credentialsProviderOverride,
StorageClassType storageClass,
ObjectCannedAcl cannedAcl,
boolean exclusiveWriteSupported)
ObjectCannedAcl cannedAcl)
{
private static final int MIN_PART_SIZE = 5 * 1024 * 1024; // S3 requirement

Expand All @@ -58,7 +57,7 @@ public RequestPayer requestPayer()

public S3Context withKmsKeyId(String kmsKeyId)
{
return new S3Context(partSize, requesterPays, S3SseContext.withKmsKeyId(kmsKeyId), credentialsProviderOverride, storageClass, cannedAcl, exclusiveWriteSupported);
return new S3Context(partSize, requesterPays, S3SseContext.withKmsKeyId(kmsKeyId), credentialsProviderOverride, storageClass, cannedAcl);
}

public S3Context withCredentials(ConnectorIdentity identity)
Expand All @@ -75,7 +74,7 @@ public S3Context withCredentials(ConnectorIdentity identity)

public S3Context withSseCustomerKey(String key)
{
return new S3Context(partSize, requesterPays, S3SseContext.withSseCustomerKey(key), credentialsProviderOverride, storageClass, cannedAcl, exclusiveWriteSupported);
return new S3Context(partSize, requesterPays, S3SseContext.withSseCustomerKey(key), credentialsProviderOverride, storageClass, cannedAcl);
}

public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credentialsProviderOverride)
Expand All @@ -86,8 +85,7 @@ public S3Context withCredentialsProviderOverride(AwsCredentialsProvider credenti
s3SseContext,
Optional.of(credentialsProviderOverride),
storageClass,
cannedAcl,
exclusiveWriteSupported);
cannedAcl);
}

public void applyCredentialProviderOverride(AwsRequestOverrideConfiguration.Builder builder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.ConfigSecuritySensitive;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand All @@ -42,6 +43,7 @@
import static software.amazon.awssdk.awscore.retry.AwsRetryStrategy.legacyRetryStrategy;
import static software.amazon.awssdk.awscore.retry.AwsRetryStrategy.standardRetryStrategy;

@DefunctConfig("s3.exclusive-create")
public class S3FileSystemConfig
{
public enum S3SseType
Expand Down Expand Up @@ -175,7 +177,6 @@ public static RetryStrategy getRetryStrategy(RetryMode retryMode)
private ObjectCannedAcl objectCannedAcl = ObjectCannedAcl.NONE;
private RetryMode retryMode = RetryMode.LEGACY;
private int maxErrorRetries = 20;
private boolean supportsExclusiveCreate = true;
private boolean crossRegionAccessEnabled;
private String applicationId = "Trino";

Expand Down Expand Up @@ -612,19 +613,6 @@ public S3FileSystemConfig setNonProxyHosts(String nonProxyHosts)
return this;
}

public boolean isSupportsExclusiveCreate()
{
return supportsExclusiveCreate;
}

@Config("s3.exclusive-create")
@ConfigDescription("Whether S3-compatible storage supports exclusive create (true for Minio and AWS S3)")
public S3FileSystemConfig setSupportsExclusiveCreate(boolean supportsExclusiveCreate)
{
this.supportsExclusiveCreate = supportsExclusiveCreate;
return this;
}

public boolean isCrossRegionAccessEnabled()
{
return crossRegionAccessEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ private S3FileSystemLoader(Optional<S3SecurityMappingProvider> mappingProvider,
config.getSseCustomerKey()),
Optional.empty(),
config.getStorageClass(),
config.getCannedAcl(),
config.isSupportsExclusiveCreate());
config.getCannedAcl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public void createOrOverwrite(byte[] data)
public void createExclusive(byte[] data)
throws IOException
{
if (!context.exclusiveWriteSupported()) {
throw new UnsupportedOperationException("createExclusive not supported by " + getClass());
}

try (OutputStream out = create(newSimpleAggregatedMemoryContext(), true)) {
out.write(data);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setAwsSecretKey(secretKey)
.setRegion(region)
.setEndpoint(endpoint)
.setSupportsExclusiveCreate(true)
.setSignerType(S3FileSystemConfig.SignerType.AwsS3V4Signer)
.setStreamingPartSize(streamingPartSize),
new S3FileSystemStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public void testDefaults()
.setHttpProxyUsername(null)
.setHttpProxyPassword(null)
.setHttpProxyPreemptiveBasicProxyAuth(false)
.setSupportsExclusiveCreate(true)
.setCrossRegionAccessEnabled(false)
.setApplicationId("Trino"));
}
Expand Down Expand Up @@ -117,7 +116,6 @@ public void testExplicitPropertyMappings()
.put("s3.http-proxy.username", "test")
.put("s3.http-proxy.password", "test")
.put("s3.http-proxy.preemptive-basic-auth", "true")
.put("s3.exclusive-create", "false")
.put("s3.application-id", "application id")
.put("s3.cross-region-access", "true")
.buildOrThrow();
Expand Down Expand Up @@ -156,7 +154,6 @@ public void testExplicitPropertyMappings()
.setHttpProxyUsername("test")
.setHttpProxyPassword("test")
.setHttpProxyPreemptiveBasicProxyAuth(true)
.setSupportsExclusiveCreate(false)
.setCrossRegionAccessEnabled(true)
.setApplicationId("application id");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setPathStyleAccess(true)
.setAwsAccessKey(Minio.MINIO_ACCESS_KEY)
.setAwsSecretKey(Minio.MINIO_SECRET_KEY)
.setSupportsExclusiveCreate(true)
.setStreamingPartSize(streamingPartSize),
new S3FileSystemStats());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setPathStyleAccess(true)
.setAwsAccessKey(MOTO_ACCESS_KEY)
.setAwsSecretKey(MOTO_SECRET_KEY)
.setSupportsExclusiveCreate(true)
.setStreamingPartSize(streamingPartSize),
new S3FileSystemStats());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,9 @@ public class TestS3FileSystemS3Mock
private static final String BUCKET = "test-bucket";

@Container
private static final S3MockContainer S3_MOCK = new S3MockContainer("3.0.1")
private static final S3MockContainer S3_MOCK = new S3MockContainer("4.10.0")
.withInitialBuckets(BUCKET);

@Override
protected boolean supportsCreateExclusive()
{
return false; // not supported by s3-mock
}

@Override
protected String bucket()
{
Expand Down Expand Up @@ -78,11 +72,23 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setRegion(Region.US_EAST_1.id())
.setPathStyleAccess(true)
.setStreamingPartSize(streamingPartSize)
.setSignerType(S3FileSystemConfig.SignerType.AwsS3V4Signer)
.setSupportsExclusiveCreate(false),
.setSignerType(S3FileSystemConfig.SignerType.AwsS3V4Signer),
new S3FileSystemStats());
}

@Test
@Override
public void testOutputFile()
{
// this is S3Mock bug, see https://github.com/adobe/S3Mock/issues/2790
assertThatThrownBy(super::testOutputFile)
.hasMessageContaining("""
Expecting actual throwable to be an instance of:
java.nio.file.FileAlreadyExistsException
but was:
java.io.IOException: software.amazon.awssdk.services.s3.model.S3Exception: (Service: S3, Status Code: 304""");
}

@Test
@Override
public void testPreSignedUris()
Expand All @@ -91,4 +97,22 @@ public void testPreSignedUris()
assertThatThrownBy(super::testPreSignedUris)
.hasMessageContaining("Expecting code to raise a throwable");
}

@Test
@Override
public void testPaths()
{
// this is S3Mock bug, see https://github.com/adobe/S3Mock/issues/2788
assertThatThrownBy(super::testPaths)
.hasMessageContaining("S3 HEAD request failed for file: s3://test-bucket/test/.././/file");
}

@Test
@Override
public void testReadingEmptyFile()
{
// this is S3Mock bug, see https://github.com/adobe/S3Mock/issues/2789
assertThatThrownBy(super::testReadingEmptyFile)
.hasMessageContaining("Failed to open S3 file: s3://test-bucket/inputStream/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public void testInputFile()
}

@Test
void testOutputFile()
public void testOutputFile()
throws IOException
{
// an output file cannot be created at the root of the file system
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

Expand All @@ -44,6 +45,7 @@

import static java.util.Collections.emptySet;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@Testcontainers
public class TestHdfsFileSystemS3Mock
Expand All @@ -52,7 +54,7 @@ public class TestHdfsFileSystemS3Mock
private static final String BUCKET = "test-bucket";

@Container
private static final S3MockContainer S3_MOCK = new S3MockContainer("3.0.1")
private static final S3MockContainer S3_MOCK = new S3MockContainer("4.10.0")
.withInitialBuckets(BUCKET);

private HdfsEnvironment hdfsEnvironment;
Expand Down Expand Up @@ -147,4 +149,13 @@ protected void verifyFileSystemIsEmpty()
throw new UncheckedIOException(e);
}
}

@Test
@Override
public void testPaths()
{
// this is S3Mock bug, see https://github.com/adobe/S3Mock/issues/2788
assertThatThrownBy(super::testPaths)
.hasMessageFindingMatch("Status Code: 400; Error Code: 400 .*\\Q(Bucket: test-bucket, Key: test/.././/file)");
}
}
5 changes: 0 additions & 5 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@
<artifactId>trino-filesystem-manager</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-filesystem-s3</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-hive</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class DeltaLakeConfig
private DataSize maxSplitSize = DataSize.of(128, MEGABYTE);
private double minimumAssignedSplitWeight = 0.05;
private int maxPartitionsPerWriter = 100;
private boolean s3TransactionLogConditionalWritesEnabled = true;
private boolean unsafeWritesEnabled;
private boolean checkpointRowStatisticsWritingEnabled = true;
private long defaultCheckpointWritingInterval = 10;
Expand Down Expand Up @@ -248,6 +249,20 @@ public DeltaLakeConfig setMaxPartitionsPerWriter(int maxPartitionsPerWriter)
return this;
}

public boolean isS3TransactionLogConditionalWritesEnabled()
{
return s3TransactionLogConditionalWritesEnabled;
}

@Config("delta.s3.transaction-log-conditional-writes.enabled")
@LegacyConfig("s3.exclusive-create")
@ConfigDescription("Whether to use conditional writes when writing Delta table transaction log files on S3. If false, lock-file based synchronization will be used.")
public DeltaLakeConfig setS3TransactionLogConditionalWritesEnabled(boolean s3TransactionLogConditionalWritesEnabled)
{
this.s3TransactionLogConditionalWritesEnabled = s3TransactionLogConditionalWritesEnabled;
return this;
}

public boolean getUnsafeWritesEnabled()
{
return unsafeWritesEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.filesystem.s3.S3FileSystemConfig;
import io.trino.plugin.deltalake.transactionlog.writer.AzureTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.GcsTransactionLogSynchronizer;
import io.trino.plugin.deltalake.transactionlog.writer.S3ConditionalWriteLogSynchronizer;
Expand Down Expand Up @@ -49,7 +48,7 @@ protected void setup(Binder binder)
binder.bind(S3LockBasedTransactionLogSynchronizer.class).in(Scopes.SINGLETON);
binder.bind(S3ConditionalWriteLogSynchronizer.class).in(Scopes.SINGLETON);

install(conditionalModule(S3FileSystemConfig.class, S3FileSystemConfig::isSupportsExclusiveCreate,
install(conditionalModule(DeltaLakeConfig.class, DeltaLakeConfig::isS3TransactionLogConditionalWritesEnabled,
s3SynchronizerModule(S3ConditionalWriteLogSynchronizer.class),
s3SynchronizerModule(S3LockBasedTransactionLogSynchronizer.class)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testDefaults()
.setMaxSplitSize(DataSize.of(128, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.05)
.setMaxPartitionsPerWriter(100)
.setS3TransactionLogConditionalWritesEnabled(true)
.setUnsafeWritesEnabled(false)
.setDefaultCheckpointWritingInterval(10)
.setCheckpointFilteringEnabled(true)
Expand Down Expand Up @@ -95,6 +96,7 @@ public void testExplicitPropertyMappings()
.put("delta.max-split-size", "10 MB")
.put("delta.minimum-assigned-split-weight", "0.01")
.put("delta.max-partitions-per-writer", "200")
.put("delta.s3.transaction-log-conditional-writes.enabled", "false")
.put("delta.enable-non-concurrent-writes", "true")
.put("delta.default-checkpoint-writing-interval", "15")
.put("delta.checkpoint-filtering.enabled", "false")
Expand Down Expand Up @@ -136,6 +138,7 @@ public void testExplicitPropertyMappings()
.setMaxSplitSize(DataSize.of(10, DataSize.Unit.MEGABYTE))
.setMinimumAssignedSplitWeight(0.01)
.setMaxPartitionsPerWriter(200)
.setS3TransactionLogConditionalWritesEnabled(false)
.setUnsafeWritesEnabled(true)
.setDefaultCheckpointWritingInterval(15)
.setCheckpointRowStatisticsWritingEnabled(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected QueryRunner createQueryRunner()
.put("s3.endpoint", minio.getMinioAddress())
.put("s3.path-style-access", "true")
.put("s3.streaming.part-size", "5MB") // minimize memory usage
.put("s3.exclusive-create", "false") // disable so we can test our own locking scheme
.put("delta.s3.transaction-log-conditional-writes.enabled", "false") // disable so we can test our own locking scheme
.put("delta.metastore.store-table-metadata", "true")
.put("delta.enable-non-concurrent-writes", "true")
.put("delta.register-table-procedure.enabled", "true")
Expand Down
Loading