Skip to content

Commit 852fe10

Browse files
authored
Fix XA Transaction bug (apache#5020)
1 parent a2d2f2d commit 852fe10

File tree

6 files changed

+47
-31
lines changed

6 files changed

+47
-31
lines changed

seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/SeaTunnelRuntimeException.java

+1-14
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.seatunnel.common.exception;
1919

20-
import java.io.PrintWriter;
21-
import java.io.StringWriter;
22-
2320
/** SeaTunnel global exception, used to tell user more clearly error messages */
2421
public class SeaTunnelRuntimeException extends RuntimeException {
2522
private final SeaTunnelErrorCode seaTunnelErrorCode;
@@ -36,17 +33,7 @@ public SeaTunnelRuntimeException(
3633
}
3734

3835
public SeaTunnelRuntimeException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
39-
super(seaTunnelErrorCode.getErrorMessage() + " - " + getMessageFromThrowable(cause));
36+
super(seaTunnelErrorCode.getErrorMessage(), cause);
4037
this.seaTunnelErrorCode = seaTunnelErrorCode;
4138
}
42-
43-
public static String getMessageFromThrowable(Throwable cause) {
44-
if (cause == null) {
45-
return "";
46-
}
47-
StringWriter stringWriter = new StringWriter();
48-
PrintWriter printWriter = new PrintWriter(stringWriter);
49-
cause.printStackTrace(printWriter);
50-
return stringWriter.toString();
51-
}
5239
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java

+1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public static JdbcConnectionConfig of(ReadonlyConfig config) {
5858
builder.xaDataSourceClassName(config.get(JdbcOptions.XA_DATA_SOURCE_CLASS_NAME));
5959
builder.maxCommitAttempts(config.get(JdbcOptions.MAX_COMMIT_ATTEMPTS));
6060
builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC));
61+
builder.maxRetries(0);
6162
}
6263

6364
config.getOptional(JdbcOptions.USER).ifPresent(builder::username);

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcExactlyOnceSinkWriter.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
import com.google.common.base.Throwables;
45+
4446
import javax.transaction.xa.Xid;
4547

4648
import java.io.IOException;
@@ -137,11 +139,22 @@ public void write(SeaTunnelRow element) {
137139
@Override
138140
public Optional<XidInfo> prepareCommit() throws IOException {
139141
tryOpen();
140-
prepareCurrentTx();
142+
143+
boolean emptyXaTransaction = false;
144+
try {
145+
prepareCurrentTx();
146+
} catch (Exception e) {
147+
if (Throwables.getRootCause(e) instanceof XaFacade.EmptyXaTransactionException) {
148+
emptyXaTransaction = true;
149+
LOG.info("skip prepare empty xa transaction, xid={}", currentXid);
150+
} else {
151+
throw e;
152+
}
153+
}
141154
this.currentXid = null;
142155
beginTx();
143156
checkState(prepareXid != null, "prepare xid must not be null");
144-
return Optional.of(new XidInfo(prepareXid, 0));
157+
return emptyXaTransaction ? Optional.empty() : Optional.of(new XidInfo(prepareXid, 0));
145158
}
146159

147160
@Override
@@ -186,14 +199,22 @@ private void beginTx() throws IOException {
186199
private void prepareCurrentTx() throws IOException {
187200
checkState(currentXid != null, "no current xid");
188201
outputFormat.flush();
202+
203+
Exception endAndPrepareException = null;
189204
try {
190205
xaFacade.endAndPrepare(currentXid);
191-
prepareXid = currentXid;
192206
} catch (Exception e) {
207+
endAndPrepareException = e;
193208
throw new JdbcConnectorException(
194209
JdbcConnectorErrorCode.XA_OPERATION_FAILED,
195210
"unable to prepare current xa transaction",
196211
e);
212+
} finally {
213+
if (endAndPrepareException == null
214+
|| Throwables.getRootCause(endAndPrepareException)
215+
instanceof XaFacade.EmptyXaTransactionException) {
216+
prepareXid = currentXid;
217+
}
197218
}
198219
}
199220
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java

+4
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo;
2929
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
3030

31+
import lombok.extern.slf4j.Slf4j;
32+
3133
import java.io.IOException;
3234
import java.util.ArrayList;
3335
import java.util.List;
3436
import java.util.stream.Collectors;
3537

38+
@Slf4j
3639
public class JdbcSinkAggregatedCommitter
3740
implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
3841

@@ -67,6 +70,7 @@ public List<JdbcAggregatedCommitInfo> commit(
6770
return aggregatedCommitInfos.stream()
6871
.map(
6972
aggregatedCommitInfo -> {
73+
log.info("commit xid: " + aggregatedCommitInfo.getXidInfoList());
7074
GroupXaOperationResult<XidInfo> result =
7175
xaGroupOps.commit(
7276
new ArrayList<>(aggregatedCommitInfo.getXidInfoList()),

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -171,18 +171,19 @@ public void received(Record<?> record) {
171171
serializeStates(writerStateSerializer.get(), states));
172172
}
173173
if (containAggCommitter) {
174-
lastCommitInfo.ifPresent(
175-
commitInfoT ->
176-
runningTask
177-
.getExecutionContext()
178-
.sendToMember(
179-
new SinkPrepareCommitOperation(
180-
barrier,
181-
committerTaskLocation,
182-
SerializationUtils.serialize(
183-
commitInfoT)),
184-
committerTaskAddress)
185-
.join());
174+
CommitInfoT commitInfoT = null;
175+
if (lastCommitInfo.isPresent()) {
176+
commitInfoT = lastCommitInfo.get();
177+
}
178+
runningTask
179+
.getExecutionContext()
180+
.sendToMember(
181+
new SinkPrepareCommitOperation(
182+
barrier,
183+
committerTaskLocation,
184+
SerializationUtils.serialize(commitInfoT)),
185+
committerTaskAddress)
186+
.join();
186187
}
187188
} else {
188189
if (containAggCommitter) {

seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/sink/SinkPrepareCommitOperation.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ public void run() throws Exception {
7979
taskExecutionService
8080
.getExecutionContext(taskLocation.getTaskGroupLocation())
8181
.getClassLoader();
82-
committerTask.receivedWriterCommitInfo(
83-
barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
82+
if (commitInfos != null) {
83+
committerTask.receivedWriterCommitInfo(
84+
barrier.getId(), SerializationUtils.deserialize(commitInfos, classLoader));
85+
}
8486
committerTask.triggerBarrier(barrier);
8587
}
8688
}

0 commit comments

Comments
 (0)