Skip to content

Commit 7d0111d

Browse files
LadyForestXComp
andauthored
[FLINK-29114][connector][filesystem] Fix issue of file overwriting caused by multiple writes to the same sink table and shared staging directory
This closes apache#24390 * Fix unstable TableSourceITCase#testTableHintWithLogicalTableScanReuse * Moves the staging dir configuration into builder for easier testing --------- Co-authored-by: Matthias Pohl <[email protected]>
1 parent 398bb50 commit 7d0111d

File tree

5 files changed

+222
-149
lines changed

5 files changed

+222
-149
lines changed

flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e

+8-7
Original file line numberDiff line numberDiff line change
@@ -679,17 +679,18 @@ Method <org.apache.flink.connector.file.table.DynamicPartitionWriter.write(java.
679679
Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.convertStringToInternalValue(java.lang.String, org.apache.flink.table.types.DataType)> in (FileInfoExtractorBulkFormat.java:156)
680680
Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileInfoExtractorBulkFormat.java:140)
681681
Method <org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileSystemCommitter.java:146)
682-
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:288)
683-
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:289)
684-
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:290)
685-
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:291)
686-
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:292)
682+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:324)
683+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:325)
684+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:326)
685+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:327)
686+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.build()> calls method <org.apache.flink.util.Preconditions.checkNotNull(java.lang.Object, java.lang.String)> in (FileSystemOutputFormat.java:328)
687687
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSystemOutputFormat.java:0)
688-
Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:566)
688+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FileSystemOutputFormat.java:291)
689+
Method <org.apache.flink.connector.file.table.FileSystemOutputFormat.createStagingDirectory(org.apache.flink.core.fs.Path)> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.String, [Ljava.lang.Object;)> in (FileSystemOutputFormat.java:109)
690+
Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:553)
689691
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSystemTableSink.java:208)
690692
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:189)
691693
Method <org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(org.apache.flink.table.connector.ProviderContext, org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:233)
692-
Method <org.apache.flink.connector.file.table.FileSystemTableSink.toStagingPath()> calls method <org.apache.flink.util.Preconditions.checkState(boolean, java.lang.Object)> in (FileSystemTableSink.java:380)
693694
Method <org.apache.flink.connector.file.table.FileSystemTableSource.listPartitions()> calls method <org.apache.flink.table.utils.PartitionPathUtils.searchPartSpecAndPaths(org.apache.flink.core.fs.FileSystem, org.apache.flink.core.fs.Path, int)> in (FileSystemTableSource.java:328)
694695
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> has return type <[Lorg.apache.flink.core.fs.Path;> in (FileSystemTableSource.java:0)
695696
Method <org.apache.flink.connector.file.table.FileSystemTableSource.paths()> references method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSource.java:295)

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java

+43-12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.connector.file.table;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
2324
import org.apache.flink.api.common.io.FinalizeOnMaster;
2425
import org.apache.flink.api.common.io.OutputFormat;
@@ -28,12 +29,14 @@
2829
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
2930
import org.apache.flink.table.api.TableException;
3031
import org.apache.flink.table.catalog.ObjectIdentifier;
32+
import org.apache.flink.util.Preconditions;
3133

3234
import java.io.IOException;
3335
import java.io.Serializable;
3436
import java.util.Collections;
3537
import java.util.LinkedHashMap;
3638
import java.util.List;
39+
import java.util.UUID;
3740

3841
import static org.apache.flink.util.Preconditions.checkNotNull;
3942

@@ -56,7 +59,7 @@ public class FileSystemOutputFormat<T>
5659
private final TableMetaStoreFactory msFactory;
5760
private final boolean overwrite;
5861
private final boolean isToLocal;
59-
private final Path tmpPath;
62+
private final Path stagingPath;
6063
private final String[] partitionColumns;
6164
private final boolean dynamicGrouped;
6265
private final LinkedHashMap<String, String> staticPartitions;
@@ -74,7 +77,7 @@ private FileSystemOutputFormat(
7477
TableMetaStoreFactory msFactory,
7578
boolean overwrite,
7679
boolean isToLocal,
77-
Path tmpPath,
80+
Path stagingPath,
7881
String[] partitionColumns,
7982
boolean dynamicGrouped,
8083
LinkedHashMap<String, String> staticPartitions,
@@ -87,7 +90,7 @@ private FileSystemOutputFormat(
8790
this.msFactory = msFactory;
8891
this.overwrite = overwrite;
8992
this.isToLocal = isToLocal;
90-
this.tmpPath = tmpPath;
93+
this.stagingPath = stagingPath;
9194
this.partitionColumns = partitionColumns;
9295
this.dynamicGrouped = dynamicGrouped;
9396
this.staticPartitions = staticPartitions;
@@ -96,6 +99,22 @@ private FileSystemOutputFormat(
9699
this.outputFileConfig = outputFileConfig;
97100
this.identifier = identifier;
98101
this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
102+
103+
createStagingDirectory(this.stagingPath);
104+
}
105+
106+
private static void createStagingDirectory(Path stagingPath) {
107+
try {
108+
final FileSystem stagingFileSystem = stagingPath.getFileSystem();
109+
Preconditions.checkState(
110+
!stagingFileSystem.exists(stagingPath),
111+
"Staging dir %s already exists",
112+
stagingPath);
113+
stagingFileSystem.mkdirs(stagingPath);
114+
} catch (IOException e) {
115+
throw new RuntimeException(
116+
"An IO error occurred while accessing the staging FileSystem.", e);
117+
}
99118
}
100119

101120
@Override
@@ -108,7 +127,7 @@ public void finalizeGlobal(FinalizationContext context) {
108127
Thread.currentThread().getContextClassLoader(),
109128
() -> {
110129
try {
111-
return fsFactory.create(tmpPath.toUri());
130+
return fsFactory.create(stagingPath.toUri());
112131
} catch (IOException e) {
113132
throw new RuntimeException(e);
114133
}
@@ -120,7 +139,7 @@ public void finalizeGlobal(FinalizationContext context) {
120139
fsFactory,
121140
msFactory,
122141
overwrite,
123-
tmpPath,
142+
stagingPath,
124143
partitionColumns.length,
125144
isToLocal,
126145
identifier,
@@ -141,7 +160,7 @@ public void finalizeGlobal(FinalizationContext context) {
141160
throw new TableException("Exception in finalizeGlobal", e);
142161
} finally {
143162
try {
144-
fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
163+
fsFactory.create(stagingPath.toUri()).delete(stagingPath, true);
145164
} catch (IOException ignore) {
146165
}
147166
}
@@ -158,7 +177,7 @@ public void open(InitializationContext context) throws IOException {
158177
PartitionTempFileManager fileManager =
159178
new PartitionTempFileManager(
160179
fsFactory,
161-
tmpPath,
180+
stagingPath,
162181
context.getTaskNumber(),
163182
context.getAttemptNumber(),
164183
outputFileConfig);
@@ -203,7 +222,7 @@ public static class Builder<T> {
203222
private String[] partitionColumns;
204223
private OutputFormatFactory<T> formatFactory;
205224
private TableMetaStoreFactory metaStoreFactory;
206-
private Path tmpPath;
225+
private Path stagingPath;
207226

208227
private LinkedHashMap<String, String> staticPartitions = new LinkedHashMap<>();
209228
private boolean dynamicGrouped = false;
@@ -258,11 +277,23 @@ public Builder<T> setIsToLocal(boolean isToLocal) {
258277
return this;
259278
}
260279

261-
public Builder<T> setTempPath(Path tmpPath) {
262-
this.tmpPath = tmpPath;
280+
public Builder<T> setPath(Path parentPath) {
281+
this.stagingPath = toStagingPath(parentPath);
263282
return this;
264283
}
265284

285+
@VisibleForTesting
286+
Builder<T> setStagingPath(Path stagingPath) {
287+
this.stagingPath = stagingPath;
288+
return this;
289+
}
290+
291+
private Path toStagingPath(Path parentPath) {
292+
return new Path(
293+
parentPath,
294+
String.format(".staging_%d_%s", System.currentTimeMillis(), UUID.randomUUID()));
295+
}
296+
266297
public Builder<T> setPartitionComputer(PartitionComputer<T> computer) {
267298
this.computer = computer;
268299
return this;
@@ -288,15 +319,15 @@ public FileSystemOutputFormat<T> build() {
288319
checkNotNull(partitionColumns, "partitionColumns should not be null");
289320
checkNotNull(formatFactory, "formatFactory should not be null");
290321
checkNotNull(metaStoreFactory, "metaStoreFactory should not be null");
291-
checkNotNull(tmpPath, "tmpPath should not be null");
322+
checkNotNull(stagingPath, "stagingPath should not be null");
292323
checkNotNull(computer, "partitionComputer should not be null");
293324

294325
return new FileSystemOutputFormat<>(
295326
fileSystemFactory,
296327
metaStoreFactory,
297328
overwrite,
298329
isToLocal,
299-
tmpPath,
330+
stagingPath,
300331
partitionColumns,
301332
dynamicGrouped,
302333
staticPartitions,

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private DataStreamSink<RowData> createBatchSink(
184184
.setMetaStoreFactory(new EmptyMetaStoreFactory(path))
185185
.setOverwrite(overwrite)
186186
.setStaticPartitions(staticPartitions)
187-
.setTempPath(toStagingPath())
187+
.setPath(path)
188188
.setOutputFileConfig(
189189
OutputFileConfig.builder()
190190
.withPartPrefix("part-" + UUID.randomUUID())
@@ -373,19 +373,6 @@ public DynamicTableSource.DataStructureConverter createDataStructureConverter(
373373
};
374374
}
375375

376-
private Path toStagingPath() {
377-
Path stagingDir = new Path(path, ".staging_" + System.currentTimeMillis());
378-
try {
379-
FileSystem fs = stagingDir.getFileSystem();
380-
Preconditions.checkState(
381-
fs.exists(stagingDir) || fs.mkdirs(stagingDir),
382-
"Failed to create staging dir " + stagingDir);
383-
return stagingDir;
384-
} catch (IOException e) {
385-
throw new RuntimeException(e);
386-
}
387-
}
388-
389376
@SuppressWarnings("unchecked")
390377
private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
391378
Object writer = createWriter(sinkContext);

0 commit comments

Comments
 (0)