Skip to content

Commit 1a5915d

Browse files
authored
[core] Introduce SnapshotCommit to abstract atomically commit (apache#4911)
1 parent ee023b0 commit 1a5915d

27 files changed

+610
-159
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.paimon;
2020

2121
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
22+
import org.apache.paimon.catalog.RenamingSnapshotCommit;
23+
import org.apache.paimon.catalog.SnapshotCommit;
2224
import org.apache.paimon.data.InternalRow;
2325
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
2426
import org.apache.paimon.fs.FileIO;
@@ -31,6 +33,7 @@
3133
import org.apache.paimon.metastore.AddPartitionTagCallback;
3234
import org.apache.paimon.operation.ChangelogDeletion;
3335
import org.apache.paimon.operation.FileStoreCommitImpl;
36+
import org.apache.paimon.operation.Lock;
3437
import org.apache.paimon.operation.ManifestsReader;
3538
import org.apache.paimon.operation.PartitionExpire;
3639
import org.apache.paimon.operation.SnapshotDeletion;
@@ -261,7 +264,14 @@ public FileStoreCommitImpl newCommit(String commitUser) {
261264

262265
@Override
263266
public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> callbacks) {
267+
SnapshotManager snapshotManager = snapshotManager();
268+
SnapshotCommit snapshotCommit = catalogEnvironment.snapshotCommit(snapshotManager);
269+
if (snapshotCommit == null) {
270+
snapshotCommit =
271+
new RenamingSnapshotCommit(snapshotManager, Lock.emptyFactory().create());
272+
}
264273
return new FileStoreCommitImpl(
274+
snapshotCommit,
265275
fileIO,
266276
schemaManager,
267277
tableName,
@@ -270,7 +280,7 @@ public FileStoreCommitImpl newCommit(String commitUser, List<CommitCallback> cal
270280
options,
271281
options.partitionDefaultName(),
272282
pathFactory(),
273-
snapshotManager(),
283+
snapshotManager,
274284
manifestFileFactory(),
275285
manifestListFactory(),
276286
indexManifestFileFactory(),

paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.paimon.fs.FileStatus;
2525
import org.apache.paimon.fs.Path;
2626
import org.apache.paimon.operation.FileStoreCommit;
27-
import org.apache.paimon.operation.Lock;
2827
import org.apache.paimon.options.Options;
2928
import org.apache.paimon.partition.Partition;
3029
import org.apache.paimon.schema.Schema;
@@ -367,9 +366,10 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>
367366

368367
@Override
369368
public Table getTable(Identifier identifier) throws TableNotExistException {
370-
Lock.Factory lockFactory =
371-
Lock.factory(lockFactory().orElse(null), lockContext().orElse(null), identifier);
372-
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, lockFactory);
369+
SnapshotCommit.Factory commitFactory =
370+
new RenamingSnapshotCommit.Factory(
371+
lockFactory().orElse(null), lockContext().orElse(null));
372+
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, commitFactory);
373373
}
374374

375375
/**

paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.fs.Path;
2525
import org.apache.paimon.manifest.PartitionEntry;
26-
import org.apache.paimon.operation.Lock;
2726
import org.apache.paimon.options.Options;
2827
import org.apache.paimon.partition.Partition;
2928
import org.apache.paimon.schema.SchemaManager;
@@ -173,7 +172,7 @@ public static Table loadTable(
173172
Catalog catalog,
174173
Identifier identifier,
175174
TableMetadata.Loader metadataLoader,
176-
Lock.Factory lockFactory)
175+
SnapshotCommit.Factory commitFactory)
177176
throws Catalog.TableNotExistException {
178177
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
179178
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -188,7 +187,7 @@ public static Table loadTable(
188187

189188
CatalogEnvironment catalogEnv =
190189
new CatalogEnvironment(
191-
identifier, metadata.uuid(), lockFactory, catalog.catalogLoader());
190+
identifier, metadata.uuid(), catalog.catalogLoader(), commitFactory);
192191
Path path = new Path(schema.options().get(PATH.key()));
193192
FileStoreTable table =
194193
FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv);

paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java

+32-11
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.util.List;
3434
import java.util.Map;
35+
import java.util.Optional;
3536
import java.util.concurrent.Callable;
3637

3738
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
@@ -117,20 +118,30 @@ protected void dropTableImpl(Identifier identifier) {
117118

118119
@Override
119120
public void createTableImpl(Identifier identifier, Schema schema) {
120-
uncheck(() -> schemaManager(identifier).createTable(schema));
121+
SchemaManager schemaManager = schemaManager(identifier);
122+
try {
123+
runWithLock(identifier, () -> uncheck(() -> schemaManager.createTable(schema)));
124+
} catch (RuntimeException e) {
125+
throw e;
126+
} catch (Exception e) {
127+
throw new RuntimeException(e);
128+
}
121129
}
122130

123-
private SchemaManager schemaManager(Identifier identifier) {
124-
Path path = getTableLocation(identifier);
125-
CatalogLock catalogLock =
126-
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
127-
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
128-
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
131+
public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exception {
132+
Optional<CatalogLockFactory> lockFactory = lockFactory();
133+
try (Lock lock =
134+
lockFactory
135+
.map(factory -> factory.createLock(lockContext().orElse(null)))
136+
.map(l -> Lock.fromCatalog(l, identifier))
137+
.orElseGet(() -> Lock.emptyFactory().create())) {
138+
return lock.runWithLock(callable);
139+
}
129140
}
130141

131-
private CatalogLockContext assertGetLockContext() {
132-
return lockContext()
133-
.orElseThrow(() -> new RuntimeException("No lock context when lock is enabled."));
142+
private SchemaManager schemaManager(Identifier identifier) {
143+
Path path = getTableLocation(identifier);
144+
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault());
134145
}
135146

136147
@Override
@@ -143,7 +154,17 @@ public void renameTableImpl(Identifier fromTable, Identifier toTable) {
143154
@Override
144155
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
145156
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
146-
schemaManager(identifier).commitChanges(changes);
157+
SchemaManager schemaManager = schemaManager(identifier);
158+
try {
159+
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
160+
} catch (TableNotExistException
161+
| ColumnAlreadyExistException
162+
| ColumnNotExistException
163+
| RuntimeException e) {
164+
throw e;
165+
} catch (Exception e) {
166+
throw new RuntimeException(e);
167+
}
147168
}
148169

149170
protected static <T> T uncheck(Callable<T> callable) {

paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public Identifier(
9292
this.database = database;
9393

9494
StringBuilder builder = new StringBuilder(table);
95-
if (branch != null) {
95+
if (branch != null && !"main".equalsIgnoreCase(branch)) {
9696
builder.append(Catalog.SYSTEM_TABLE_SPLITTER)
9797
.append(Catalog.SYSTEM_BRANCH_PREFIX)
9898
.append(branch);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.annotation.VisibleForTesting;
23+
import org.apache.paimon.fs.FileIO;
24+
import org.apache.paimon.fs.Path;
25+
import org.apache.paimon.operation.Lock;
26+
import org.apache.paimon.utils.SnapshotManager;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.Optional;
31+
import java.util.concurrent.Callable;
32+
33+
/**
34+
* A {@link SnapshotCommit} using file renaming to commit.
35+
*
36+
* <p>Note that when the file system is local or HDFS, rename is atomic. But if the file system is
37+
* object storage, we need additional lock protection.
38+
*/
39+
public class RenamingSnapshotCommit implements SnapshotCommit {
40+
41+
private final SnapshotManager snapshotManager;
42+
private final FileIO fileIO;
43+
private final Lock lock;
44+
45+
public RenamingSnapshotCommit(SnapshotManager snapshotManager, Lock lock) {
46+
this.snapshotManager = snapshotManager;
47+
this.fileIO = snapshotManager.fileIO();
48+
this.lock = lock;
49+
}
50+
51+
@Override
52+
public boolean commit(Snapshot snapshot, String branch) throws Exception {
53+
Path newSnapshotPath =
54+
snapshotManager.branch().equals(branch)
55+
? snapshotManager.snapshotPath(snapshot.id())
56+
: snapshotManager.copyWithBranch(branch).snapshotPath(snapshot.id());
57+
58+
Callable<Boolean> callable =
59+
() -> {
60+
boolean committed = fileIO.tryToWriteAtomic(newSnapshotPath, snapshot.toJson());
61+
if (committed) {
62+
snapshotManager.commitLatestHint(snapshot.id());
63+
}
64+
return committed;
65+
};
66+
return lock.runWithLock(
67+
() ->
68+
// fs.rename may not returns false if target file
69+
// already exists, or even not atomic
70+
// as we're relying on external locking, we can first
71+
// check if file exist then rename to work around this
72+
// case
73+
!fileIO.exists(newSnapshotPath) && callable.call());
74+
}
75+
76+
@Override
77+
public void close() throws Exception {
78+
this.lock.close();
79+
}
80+
81+
/** Factory to create {@link RenamingSnapshotCommit}. */
82+
public static class Factory implements SnapshotCommit.Factory {
83+
84+
private static final long serialVersionUID = 1L;
85+
86+
@Nullable private final CatalogLockFactory lockFactory;
87+
@Nullable private final CatalogLockContext lockContext;
88+
89+
public Factory(
90+
@Nullable CatalogLockFactory lockFactory,
91+
@Nullable CatalogLockContext lockContext) {
92+
this.lockFactory = lockFactory;
93+
this.lockContext = lockContext;
94+
}
95+
96+
@Override
97+
public SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager) {
98+
Lock lock =
99+
Optional.ofNullable(lockFactory)
100+
.map(factory -> factory.createLock(lockContext))
101+
.map(l -> Lock.fromCatalog(l, identifier))
102+
.orElseGet(() -> Lock.emptyFactory().create());
103+
return new RenamingSnapshotCommit(snapshotManager, lock);
104+
}
105+
106+
@VisibleForTesting
107+
@Nullable
108+
public CatalogLockFactory lockFactory() {
109+
return lockFactory;
110+
}
111+
112+
@VisibleForTesting
113+
@Nullable
114+
public CatalogLockContext lockContext() {
115+
return lockContext;
116+
}
117+
}
118+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.utils.SnapshotManager;
23+
24+
import java.io.Serializable;
25+
26+
/** Interface to commit snapshot atomically. */
27+
public interface SnapshotCommit extends AutoCloseable {
28+
29+
boolean commit(Snapshot snapshot, String branch) throws Exception;
30+
31+
/** Factory to create {@link SnapshotCommit}. */
32+
interface Factory extends Serializable {
33+
SnapshotCommit create(Identifier identifier, SnapshotManager snapshotManager);
34+
}
35+
}

paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java

+18-7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import java.util.Map;
5858
import java.util.Optional;
5959
import java.util.Set;
60+
import java.util.concurrent.Callable;
6061
import java.util.stream.Collectors;
6162

6263
import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout;
@@ -289,7 +290,8 @@ protected void dropTableImpl(Identifier identifier) {
289290
protected void createTableImpl(Identifier identifier, Schema schema) {
290291
try {
291292
// create table file
292-
getSchemaManager(identifier).createTable(schema);
293+
SchemaManager schemaManager = getSchemaManager(identifier);
294+
runWithLock(identifier, () -> schemaManager.createTable(schema));
293295
// Update schema metadata
294296
Path path = getTableLocation(identifier);
295297
int insertRecord =
@@ -350,10 +352,19 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
350352

351353
@Override
352354
protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
353-
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException {
355+
throws ColumnAlreadyExistException, TableNotExistException, ColumnNotExistException {
354356
assertMainBranch(identifier);
355357
SchemaManager schemaManager = getSchemaManager(identifier);
356-
schemaManager.commitChanges(changes);
358+
try {
359+
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
360+
} catch (TableNotExistException
361+
| ColumnAlreadyExistException
362+
| ColumnNotExistException
363+
| RuntimeException e) {
364+
throw e;
365+
} catch (Exception e) {
366+
throw new RuntimeException("Failed to alter table " + identifier.getFullName(), e);
367+
}
357368
}
358369

359370
@Override
@@ -384,17 +395,17 @@ public Optional<CatalogLockContext> lockContext() {
384395
return Optional.of(new JdbcCatalogLockContext(catalogKey, options));
385396
}
386397

387-
private Lock lock(Identifier identifier) {
398+
public <T> T runWithLock(Identifier identifier, Callable<T> callable) throws Exception {
388399
if (!lockEnabled()) {
389-
return new Lock.EmptyLock();
400+
return callable.call();
390401
}
391402
JdbcCatalogLock lock =
392403
new JdbcCatalogLock(
393404
connections,
394405
catalogKey,
395406
checkMaxSleep(options.toMap()),
396407
acquireTimeout(options.toMap()));
397-
return Lock.fromCatalog(lock, identifier);
408+
return Lock.fromCatalog(lock, identifier).runWithLock(callable);
398409
}
399410

400411
@Override
@@ -403,7 +414,7 @@ public void close() throws Exception {
403414
}
404415

405416
private SchemaManager getSchemaManager(Identifier identifier) {
406-
return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier));
417+
return new SchemaManager(fileIO, getTableLocation(identifier));
407418
}
408419

409420
private Map<String, String> fetchProperties(String databaseName) {

0 commit comments

Comments
 (0)