diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java index eafb0575068..4137fc377a0 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java @@ -91,7 +91,8 @@ public DagNodeId getDagNodeId() { boolean exists(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException, SQLException; /** - * Persist the dag action in {@link DagActionStore} for durability + * Persist the dag action in {@link DagActionStore} for durability. Throws exception for failed insert due to a + * duplicate key error. * @param flowGroup flow group for the dag action * @param flowName flow name for the dag action * @param flowExecutionId flow execution for the dag action @@ -102,7 +103,21 @@ public DagNodeId getDagNodeId() { void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType) throws IOException; /** - * Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName. + * Persist the dag action in {@link DagActionStore} for durability + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param jobName job name for the dag action + * @param dagActionType the value of the dag action + * @param ignoreDuplicates boolean value used to indicate whether duplicate insertions will result in an exception + * being thrown or ignored + * @throws IOException + */ + void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType, boolean ignoreDuplicates) throws IOException; + + /** + * Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName and throws + * * exception for a failed insert to a duplicate key error. * @param flowGroup flow group for the dag action * @param flowName flow name for the dag action * @param flowExecutionId flow execution for the dag action @@ -111,6 +126,18 @@ public DagNodeId getDagNodeId() { */ void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException; + /** + * Persist the dag action in {@link DagActionStore} for durability. This method assumes an empty jobName. + * @param flowGroup flow group for the dag action + * @param flowName flow name for the dag action + * @param flowExecutionId flow execution for the dag action + * @param dagActionType the value of the dag action + * @param ignoreDuplicates boolean value used to indicate whether duplicate insertions will result in an exception + * being thrown or ignored + * @throws IOException + */ + void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType, boolean ignoreDuplicates) throws IOException; + /** * delete the dag action from {@link DagActionStore} * @param DagAction containing all information needed to identify dag and specific action value diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java index c41bfe81f31..13319fc1153 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java @@ -130,12 +130,14 @@ public void handleFlowLaunchTriggerEvent(Properties jobProps, DagActionStore.Dag // Otherwise leaseAttemptStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything } - // Called after obtaining a lease to persist the dag action to {@link DagActionStore} and mark the lease as done + /* Called after obtaining a lease to persist the dag action to {@link DagActionStore} and mark the lease as done. + If a duplicate dag action already exists in the store, the lease is still marked as the action need only occur once. + */ private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus leaseStatus) { if (this.dagActionStore.isPresent()) { try { DagActionStore.DagAction dagAction = leaseStatus.getDagAction(); - this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(), dagAction.getDagActionType()); + this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), dagAction.getFlowName(), dagAction.getFlowExecutionId(), dagAction.getDagActionType(), true); // If the dag action has been persisted to the {@link DagActionStore} we can close the lease this.numFlowsSubmitted.mark(); return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java index f0b93e0595c..b98ea880c57 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java @@ -53,8 +53,10 @@ public class MysqlDagActionStore implements DagActionStore { private String thisTableRetentionStatement; private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE flow_group = ? AND flow_name = ? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?)"; - protected static final String INSERT_STATEMENT = "INSERT INTO %s (flow_group, flow_name, flow_execution_id, job_name, dag_action) " - + "VALUES (?, ?, ?, ?, ?)"; + protected static final String INSERT_STATEMENT = "INSERT INTO %s " + + "(flow_group, flow_name, flow_execution_id, job_name, dag_action) VALUES (?, ?, ?, ?, ?)"; + protected static final String INSERT_IGNORE_DUPLICATES_STATEMENT = "INSERT IGNORE INTO %s " + + "(flow_group, flow_name, flow_execution_id, job_name, dag_action) VALUES (?, ?, ?, ?, ?)"; private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?"; private static final String GET_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, job_name, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND job_name = ? AND dag_action = ?"; private static final String GET_ALL_STATEMENT = "SELECT flow_group, flow_name, flow_execution_id, job_name, dag_action FROM %s"; @@ -127,9 +129,18 @@ public boolean exists(String flowGroup, String flowName, String flowExecutionId, } @Override - public void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, DagActionType dagActionType) + public void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionType dagActionType) throws IOException { - dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { + addJobDagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType, false); + } + + @Override + public void addJobDagAction(String flowGroup, String flowName, String flowExecutionId, String jobName, + DagActionType dagActionType, boolean ignoreDuplicates) throws IOException { + dbStatementExecutor.withPreparedStatement( + String.format(ignoreDuplicates ? INSERT_IGNORE_DUPLICATES_STATEMENT : INSERT_STATEMENT, tableName), + insertStatement -> { try { int i = 0; insertStatement.setString(++i, flowGroup); @@ -147,7 +158,13 @@ public void addJobDagAction(String flowGroup, String flowName, String flowExecut @Override public void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType) throws IOException { - addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType); + addFlowDagAction(flowGroup, flowName, flowExecutionId, dagActionType, false); + } + + @Override + public void addFlowDagAction(String flowGroup, String flowName, String flowExecutionId, DagActionType dagActionType, + boolean ignoreDuplicates) throws IOException { + addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType, ignoreDuplicates); } @Override