diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java index 94f432f1c16a42..b49233374020a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java @@ -41,6 +41,8 @@ import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; @@ -50,6 +52,7 @@ @Slf4j public class ExportTaskExecutor implements TransientTaskExecutor { + private static final Logger LOG = LogManager.getLogger(ExportTaskExecutor.class); List selectStmtLists; @@ -78,22 +81,32 @@ public Long getId() { @Override public void execute() throws JobException { + LOG.debug("[Export Task] taskId: {} starting execution", taskId); if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} was already canceled before execution", taskId); throw new JobException("Export executor has been canceled, task id: {}", taskId); } + LOG.debug("[Export Task] taskId: {} updating state to EXPORTING", taskId); exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null); List outfileInfoList = Lists.newArrayList(); for (int idx = 0; idx < selectStmtLists.size(); ++idx) { + LOG.debug("[Export Task] taskId: {} processing statement {}/{}", + taskId, idx + 1, selectStmtLists.size()); if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} canceled during execution at statement {}", taskId, idx + 1); throw new JobException("Export executor has been canceled, task id: {}", taskId); } // check the version of tablets, skip if the consistency is in partition level. if (exportJob.getExportTable().isManagedTable() && !exportJob.isPartitionConsistency()) { + LOG.debug("[Export Task] taskId: {} checking tablet versions for statement {}", taskId, idx + 1); try { Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException( exportJob.getTableName().getDb()); OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl()); + LOG.debug("[Export Lock] taskId: {}, table: {} about to acquire readLock", + taskId, table.getName()); table.readLock(); + LOG.debug("[Export Lock] taskId: {}, table: {} acquired readLock", taskId, table.getName()); try { List tabletIds; LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) selectStmtLists.get(idx); @@ -108,6 +121,8 @@ public void execute() throws JobException { long nowVersion = partition.getVisibleVersion(); long oldVersion = exportJob.getPartitionToVersion().get(partition.getName()); if (nowVersion != oldVersion) { + LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock" + + "due to version mismatch", taskId, table.getName()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed"); throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}" @@ -115,11 +130,17 @@ public void execute() throws JobException { } } } catch (Exception e) { + LOG.debug("[Export Lock] taskId: {}, table: {} about to release readLock" + + "due to exception: {}", taskId, table.getName(), e.getMessage()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); throw new JobException(e); } finally { + LOG.debug("[Export Lock] taskId: {}, table: {} releasing readLock in finally block", + taskId, table.getName()); table.readUnlock(); + LOG.debug("[Export Lock] taskId: {}, table: {} released readLock successfully", + taskId, table.getName()); } } catch (AnalysisException e) { exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, @@ -129,26 +150,39 @@ public void execute() throws JobException { } try (AutoCloseConnectContext r = buildConnectContext()) { + LOG.debug("[Export Task] taskId: {} executing statement {}", taskId, idx + 1); stmtExecutor = new StmtExecutor(r.connectContext, selectStmtLists.get(idx)); stmtExecutor.execute(); if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) { + LOG.debug("[Export Task] taskId: {} failed with MySQL error: {}", taskId, + r.connectContext.getState().getErrorMessage()); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage()); return; } + LOG.debug("[Export Task] taskId: {} statement {} executed successfully", taskId, idx + 1); OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); + LOG.debug("[Export Task] taskId: {} got outfile info for statement {}:" + + "fileNumber={}, totalRows={}, fileSize={}", + taskId, idx + 1, outfileInfo.getFileNumber(), + outfileInfo.getTotalRows(), outfileInfo.getFileSize()); outfileInfoList.add(outfileInfo); } catch (Exception e) { + LOG.debug("[Export Task] taskId: {} failed with exception during statement {}: {}", + taskId, idx + 1, e.getMessage(), e); exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null, ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); throw new JobException(e); } } if (isCanceled.get()) { + LOG.debug("[Export Task] taskId: {} canceled after processing all statements", taskId); throw new JobException("Export executor has been canceled, task id: {}", taskId); } + LOG.debug("[Export Task] taskId: {} completed successfully, updating state to FINISHED", taskId); exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null); isFinished.getAndSet(true); + LOG.debug("[Export Task] taskId: {} execution completed", taskId); } @Override diff --git a/regression-test/pipeline/p0/conf/fe.conf b/regression-test/pipeline/p0/conf/fe.conf index 2f493ff1098cb6..8c309a7d2c4ea0 100644 --- a/regression-test/pipeline/p0/conf/fe.conf +++ b/regression-test/pipeline/p0/conf/fe.conf @@ -34,7 +34,7 @@ JAVA_OPTS_FOR_JDK_17="-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m sys_log_level = INFO sys_log_mode = NORMAL -sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl +sys_log_verbose_modules = org.apache.doris.common.profile,org.apache.doris.qe.QeProcessorImpl,org.apache.doris.load.ExportTaskExecutor arrow_flight_sql_port = 8081 catalog_trash_expire_second=1 #enable ssl for test