Skip to content

Commit 381e58b

Browse files
authored
[flink] Add a action/procedure to remove unexisting files from manifests (apache#4781)
1 parent 74a0894 commit 381e58b

File tree

17 files changed

+1272
-2
lines changed

17 files changed

+1272
-2
lines changed

docs/content/flink/procedures.md

+25
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,31 @@ All available procedures are listed below.
344344
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local')
345345
</td>
346346
</tr>
347+
<tr>
348+
<td>remove_unexisting_files</td>
349+
<td>
350+
-- Use named argument<br/>
351+
CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', dry_run => 'dryRun', parallelism => 'parallelism') <br/><br/>
352+
-- Use indexed argument<br/>
353+
CALL [catalog.]sys.remove_unexisting_files('identifier')<br/><br/>
354+
CALL [catalog.]sys.remove_unexisting_files('identifier', 'dryRun', 'parallelism')
355+
</td>
356+
<td>
357+
Procedure to remove unexisting data files from manifest entries. See <a href="https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html">Java docs</a> for detailed use cases. Arguments:
358+
<li>identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
359+
<li>dryRun (optional): only check what files will be removed, but not really remove them. Default is false.</li>
360+
<li>parallelism (optional): number of parallelisms to check files in the manifests.</li>
361+
<br>
362+
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
363+
</td>
364+
<td>
365+
-- remove unexisting data files in the table `mydb.myt`
366+
CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
367+
<br>
368+
-- only check what files will be removed, but not really remove them (dry run)
369+
CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
370+
</td>
371+
</tr>
347372
<tr>
348373
<td>reset_consumer</td>
349374
<td>

docs/content/spark/procedures.md

+18
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,24 @@ This section introduce all available spark procedures about paimon.
256256
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
257257
</td>
258258
</tr>
259+
<tr>
260+
<td>remove_unexisting_files</td>
261+
<td>
262+
Procedure to remove unexisting data files from manifest entries. See <a href="https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html">Java docs</a> for detailed use cases. Arguments:
263+
<li>identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
264+
<li>dryRun (optional): only check what files will be removed, but not really remove them. Default is false.</li>
265+
<li>parallelism (optional): number of parallelisms to check files in the manifests.</li>
266+
<br>
267+
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
268+
</td>
269+
<td>
270+
-- remove unexisting data files in the table `mydb.myt`
271+
CALL sys.remove_unexisting_files(table => 'mydb.myt')
272+
<br>
273+
-- only check what files will be removed, but not really remove them (dry run)
274+
CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true)
275+
</td>
276+
</tr>
259277
<tr>
260278
<td>repair</td>
261279
<td>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation;
20+
21+
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.fs.Path;
23+
import org.apache.paimon.io.DataFileMeta;
24+
import org.apache.paimon.io.DataFilePathFactory;
25+
import org.apache.paimon.table.FileStoreTable;
26+
import org.apache.paimon.table.source.DataSplit;
27+
import org.apache.paimon.table.source.Split;
28+
import org.apache.paimon.utils.FileStorePathFactory;
29+
import org.apache.paimon.utils.ThreadPoolUtils;
30+
31+
import java.io.IOException;
32+
import java.io.UncheckedIOException;
33+
import java.util.ArrayList;
34+
import java.util.Collections;
35+
import java.util.HashMap;
36+
import java.util.Iterator;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.ThreadPoolExecutor;
40+
41+
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
42+
43+
/** List what data files recorded in manifests are missing from the filesystem. */
44+
public class ListUnexistingFiles {
45+
46+
private final FileStoreTable table;
47+
private final FileStorePathFactory pathFactory;
48+
private final ThreadPoolExecutor executor;
49+
50+
public ListUnexistingFiles(FileStoreTable table) {
51+
this.table = table;
52+
this.pathFactory = table.store().pathFactory();
53+
this.executor =
54+
createCachedThreadPool(
55+
table.coreOptions().deleteFileThreadNum(), "LIST_UNEXISTING_FILES");
56+
}
57+
58+
public Map<Integer, Map<String, DataFileMeta>> list(BinaryRow partition) throws Exception {
59+
Map<Integer, Map<String, DataFileMeta>> result = new HashMap<>();
60+
List<Split> splits =
61+
table.newScan()
62+
.withPartitionFilter(Collections.singletonList(partition))
63+
.plan()
64+
.splits();
65+
Iterator<ListResult> it =
66+
ThreadPoolUtils.randomlyExecuteSequentialReturn(
67+
executor, split -> listFilesInDataSplit((DataSplit) split), splits);
68+
while (it.hasNext()) {
69+
ListResult item = it.next();
70+
result.computeIfAbsent(item.bucket, k -> new HashMap<>()).put(item.path, item.meta);
71+
}
72+
return result;
73+
}
74+
75+
private List<ListResult> listFilesInDataSplit(DataSplit dataSplit) {
76+
List<ListResult> results = new ArrayList<>();
77+
DataFilePathFactory dataFilePathFactory =
78+
pathFactory.createDataFilePathFactory(dataSplit.partition(), dataSplit.bucket());
79+
for (DataFileMeta meta : dataSplit.dataFiles()) {
80+
Path path = dataFilePathFactory.toPath(meta);
81+
try {
82+
if (!table.fileIO().exists(path)) {
83+
results.add(new ListResult(dataSplit.bucket(), path.toString(), meta));
84+
}
85+
} catch (IOException e) {
86+
throw new UncheckedIOException("Cannot determine if file " + path + " exists.", e);
87+
}
88+
}
89+
return results;
90+
}
91+
92+
private static class ListResult {
93+
94+
private final int bucket;
95+
private final String path;
96+
private final DataFileMeta meta;
97+
98+
private ListResult(int bucket, String path, DataFileMeta meta) {
99+
this.bucket = bucket;
100+
this.path = path;
101+
this.meta = meta;
102+
}
103+
}
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.FileSystemCatalog;
23+
import org.apache.paimon.catalog.Identifier;
24+
import org.apache.paimon.data.BinaryRow;
25+
import org.apache.paimon.data.BinaryRowWriter;
26+
import org.apache.paimon.data.GenericRow;
27+
import org.apache.paimon.fs.FileStatus;
28+
import org.apache.paimon.fs.Path;
29+
import org.apache.paimon.fs.local.LocalFileIO;
30+
import org.apache.paimon.io.DataFileMeta;
31+
import org.apache.paimon.schema.Schema;
32+
import org.apache.paimon.table.FileStoreTable;
33+
import org.apache.paimon.table.sink.TableCommitImpl;
34+
import org.apache.paimon.table.sink.TableWriteImpl;
35+
import org.apache.paimon.types.DataType;
36+
import org.apache.paimon.types.DataTypes;
37+
import org.apache.paimon.types.RowType;
38+
39+
import org.junit.jupiter.api.io.TempDir;
40+
import org.junit.jupiter.params.ParameterizedTest;
41+
import org.junit.jupiter.params.provider.ValueSource;
42+
43+
import java.util.ArrayList;
44+
import java.util.Arrays;
45+
import java.util.Collections;
46+
import java.util.HashMap;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.UUID;
50+
import java.util.concurrent.ThreadLocalRandom;
51+
import java.util.function.Function;
52+
import java.util.stream.Collectors;
53+
54+
import static org.assertj.core.api.Assertions.assertThat;
55+
56+
/** Tests for {@link ListUnexistingFiles}. */
57+
public class ListUnexistingFilesTest {
58+
59+
@TempDir java.nio.file.Path tempDir;
60+
61+
@ParameterizedTest
62+
@ValueSource(ints = {-1, 3})
63+
public void testListFiles(int bucket) throws Exception {
64+
int numPartitions = 2;
65+
int numFiles = 10;
66+
int[] numDeletes = new int[numPartitions];
67+
FileStoreTable table =
68+
prepareRandomlyDeletedTable(
69+
tempDir.toString(), "mydb", "t", bucket, numFiles, numDeletes);
70+
71+
Function<Integer, BinaryRow> binaryRow =
72+
i -> {
73+
BinaryRow b = new BinaryRow(1);
74+
BinaryRowWriter writer = new BinaryRowWriter(b);
75+
writer.writeInt(0, i);
76+
writer.complete();
77+
return b;
78+
};
79+
80+
ListUnexistingFiles operation = new ListUnexistingFiles(table);
81+
for (int i = 0; i < numPartitions; i++) {
82+
Map<Integer, Map<String, DataFileMeta>> result = operation.list(binaryRow.apply(i));
83+
assertThat(result.values().stream().mapToInt(Map::size).sum()).isEqualTo(numDeletes[i]);
84+
}
85+
}
86+
87+
public static FileStoreTable prepareRandomlyDeletedTable(
88+
String warehouse,
89+
String databaseName,
90+
String tableName,
91+
int bucket,
92+
int numFiles,
93+
int[] numDeletes)
94+
throws Exception {
95+
RowType rowType =
96+
RowType.of(
97+
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
98+
new String[] {"pt", "id", "v"});
99+
Map<String, String> options = new HashMap<>();
100+
options.put(CoreOptions.BUCKET.key(), String.valueOf(bucket));
101+
options.put(CoreOptions.WRITE_ONLY.key(), "true");
102+
if (bucket > 0) {
103+
options.put(CoreOptions.BUCKET_KEY.key(), "id");
104+
}
105+
FileStoreTable table =
106+
createPaimonTable(
107+
warehouse,
108+
databaseName,
109+
tableName,
110+
rowType,
111+
Collections.singletonList("pt"),
112+
options);
113+
114+
String commitUser = UUID.randomUUID().toString();
115+
TableWriteImpl<?> write = table.newWrite(commitUser);
116+
TableCommitImpl commit = table.newCommit(commitUser);
117+
118+
ThreadLocalRandom random = ThreadLocalRandom.current();
119+
int numPartitions = numDeletes.length;
120+
for (int i = 0; i < numPartitions; i++) {
121+
numDeletes[i] = random.nextInt(0, numFiles + 1);
122+
}
123+
124+
int identifier = 0;
125+
for (int i = 0; i < numPartitions; i++) {
126+
for (int j = 0; j < numFiles; j++) {
127+
write.write(GenericRow.of(i, random.nextInt(), random.nextLong()));
128+
identifier++;
129+
commit.commit(identifier, write.prepareCommit(false, identifier));
130+
}
131+
}
132+
133+
write.close();
134+
commit.close();
135+
136+
for (int i = 0; i < numPartitions; i++) {
137+
LocalFileIO fileIO = LocalFileIO.create();
138+
List<Path> paths = new ArrayList<>();
139+
for (int j = 0; j < Math.max(1, bucket); j++) {
140+
Path path = new Path(table.location(), "pt=" + i + "/bucket-" + j);
141+
paths.addAll(
142+
Arrays.stream(fileIO.listStatus(path))
143+
.map(FileStatus::getPath)
144+
.collect(Collectors.toList()));
145+
}
146+
Collections.shuffle(paths);
147+
for (int j = 0; j < numDeletes[i]; j++) {
148+
fileIO.deleteQuietly(paths.get(j));
149+
}
150+
}
151+
152+
return table;
153+
}
154+
155+
private static FileStoreTable createPaimonTable(
156+
String warehouse,
157+
String databaseName,
158+
String tableName,
159+
RowType rowType,
160+
List<String> partitionKeys,
161+
Map<String, String> customOptions)
162+
throws Exception {
163+
LocalFileIO fileIO = LocalFileIO.create();
164+
Path path = new Path(warehouse);
165+
166+
Schema schema =
167+
new Schema(
168+
rowType.getFields(),
169+
partitionKeys,
170+
Collections.emptyList(),
171+
customOptions,
172+
"");
173+
174+
try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) {
175+
paimonCatalog.createDatabase(databaseName, true);
176+
Identifier paimonIdentifier = Identifier.create(databaseName, tableName);
177+
paimonCatalog.createTable(paimonIdentifier, schema, false);
178+
return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
179+
}
180+
}
181+
}

0 commit comments

Comments
 (0)