From ea1aa48875fb714ecc9de704c7261bf2ab131fc9 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 14 Oct 2025 17:49:00 +0200 Subject: [PATCH 1/4] Get exit code from google API Signed-off-by: jorgee --- .../batch/GoogleBatchTaskHandler.groovy | 27 +++++- .../batch/GoogleBatchTaskHandlerTest.groovy | 88 +++++++++++++++++-- 2 files changed, 107 insertions(+), 8 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 2b67dfd75b..01ba07e1ac 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -545,7 +545,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { if( state in COMPLETED ) { log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - terminated job=$jobId; task=$taskId; state=$state" // finalize the task - task.exitStatus = readExitFile() + task.exitStatus = getExitCode() if( state == 'FAILED' ) { if( task.exitStatus == Integer.MAX_VALUE ) task.error = getJobError() @@ -565,6 +565,29 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { return false } + /** + * Try to get the latest exit code form the task status events list. + * Fallback to read .exitcode file generated by Nextflow if not found (null). + * The rationale of this is that, in case of error, the exit code return by the batch API is more reliable. + * + * @return exit code if found, otherwise Integer.MAX_VALUE + */ + private Integer getExitCode(){ + final events = client.getTaskStatus(jobId, taskId)?.getStatusEventsList() + if( events ) { + log.debug("[GOOGLE BATCH] Getting exit code from events: $events") + final batchExitCode = events.stream().filter(ev -> ev.hasTaskExecution()) + .max( (ev1, ev2) -> Long.compare(ev1.getEventTime().seconds, ev2.getEventTime().seconds) ) + .map(ev -> ev.getTaskExecution().getExitCode()) + .orElse(null) + if( batchExitCode != null && batchExitCode < 50000) // Ignore 500XX codes, they will be managed later. + return batchExitCode + } + // fallback to read + log.debug("[GOOGLE BATCH] Exit code not found from API. Checking .exitcode file...") + return readExitFile() + } + protected Throwable getJobError() { try { final events = noTaskJobfailure @@ -574,7 +597,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}" final error = lastEvent?.description - if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) { + if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) || lastEvent?.taskExecution?.exitCode > 50000) { return new ProcessException(error) } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index aa27e0e81e..5918937fb1 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -21,7 +21,10 @@ import com.google.api.gax.grpc.GrpcStatusCode import com.google.api.gax.rpc.NotFoundException import com.google.cloud.batch.v1.JobStatus import com.google.cloud.batch.v1.Task +import com.google.cloud.batch.v1.TaskExecution import io.grpc.Status +import nextflow.cloud.google.batch.logging.BatchLogging +import nextflow.exception.ProcessException import java.nio.file.Path @@ -473,15 +476,18 @@ class GoogleBatchTaskHandlerTest extends Specification { } - TaskStatus makeTaskStatus(TaskStatus.State state, String desc) { + TaskStatus makeTaskStatus(TaskStatus.State state, String desc, Integer exitCode = null) { def builder = TaskStatus.newBuilder() if (state) builder.setState(state) - if (desc) - builder.addStatusEvents( - StatusEvent.newBuilder() - .setDescription(desc) - ) + if (desc || exitCode != null) { + def statusBuilder = StatusEvent.newBuilder() + if (desc) + statusBuilder.setDescription(desc) + if (exitCode != null) + statusBuilder.setTaskExecution(TaskExecution.newBuilder().setExitCode(exitCode).build()) + builder.addStatusEvents(statusBuilder.build()) + } builder.build() } @@ -665,4 +671,74 @@ class GoogleBatchTaskHandlerTest extends Specification { .build() } + + def 'should check if completed from task status' () { + given: + def jobId = '1' + def taskId = '1' + def client = Mock(BatchClient){ + getTaskInArrayStatus(jobId, taskId) >> makeTaskStatus(STATE,"", EXIT_CODE) + getTaskStatus(jobId, taskId) >> makeTaskStatus(STATE,"", EXIT_CODE) + getJobStatus(jobId) >> makeJobStatus(JOB_STATUS,"") + } + def logging = Mock(BatchLogging) + def executor = Mock(GoogleBatchExecutor){ + getLogging() >> logging + } + def task = new TaskRun() + task.name = 'hello' + def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor)) + when: + def result = handler.checkIfCompleted() + then: + handler.status == TASK_STATUS + handler.task.exitStatus == EXIT_STATUS + result == RESULT + + where: + JOB_STATUS | STATE | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT + JobStatus.State.SUCCEEDED | TaskStatus.State.SUCCEEDED | 0 | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true + JobStatus.State.FAILED | TaskStatus.State.FAILED | 1 | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true + JobStatus.State.RUNNING | TaskStatus.State.RUNNING | null | true | nextflow.processor.TaskStatus.RUNNING | Integer.MAX_VALUE | false + JobStatus.State.SUCCEEDED | TaskStatus.State.SUCCEEDED | 0 | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true + JobStatus.State.FAILED | TaskStatus.State.FAILED | 1 | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true + JobStatus.State.RUNNING | TaskStatus.State.RUNNING | null | false | nextflow.processor.TaskStatus.RUNNING | Integer.MAX_VALUE | false + + } + + def 'should check if completed from read file' () { + given: + def jobId = '1' + def taskId = '1' + def client = Mock(BatchClient){ + getTaskInArrayStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null } + getTaskStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null } + getJobStatus(jobId ) >> makeJobStatus(JobStatus.State.FAILED,DESC) + } + def logging = Mock(BatchLogging) + def executor = Mock(GoogleBatchExecutor){ + getLogging() >> logging + } + def task = new TaskRun() + task.name = 'hello' + def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor)) + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.readExitFile() >> EXIT_STATUS + handler.status == TASK_STATUS + handler.task.exitStatus == EXIT_STATUS + handler.task.error?.message == TASK_ERROR + result == RESULT + + where: + TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR + TaskStatus.State.FAILED | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null + null | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null + TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | true | nextflow.processor.TaskStatus.COMPLETED | Integer.MAX_VALUE | true | 'Task failed due to Spot VM preemption with exit code 50001.' + TaskStatus.State.FAILED | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null + null | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null + TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus.COMPLETED | Integer.MAX_VALUE | true | 'Task failed due to Spot VM preemption with exit code 50001.' + } + } From 1f1ee12efe3b80f6feeed7e719e4b69a963ec945 Mon Sep 17 00:00:00 2001 From: jorgee Date: Tue, 14 Oct 2025 17:52:42 +0200 Subject: [PATCH 2/4] Do not fallback to .exitcode when exit code is 0 Signed-off-by: jorgee --- .../aws/batch/AwsBatchTaskHandler.groovy | 6 +- .../aws/batch/AwsBatchTaskHandlerTest.groovy | 107 +++++++++++- .../azure/batch/AzBatchTaskHandler.groovy | 9 +- .../azure/batch/AzBatchTaskHandlerTest.groovy | 159 ++++++++++++++++++ .../main/nextflow/k8s/K8sTaskHandler.groovy | 4 +- .../nextflow/k8s/K8sTaskHandlerTest.groovy | 19 +++ 6 files changed, 297 insertions(+), 7 deletions(-) diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy index 636e28c0ef..477dc34471 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy @@ -303,11 +303,11 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler> job + 0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code + and: + result == true + handler.status == TaskStatus.COMPLETED + handler.task.exitStatus == 0 + } + + def 'should check if completed with non-zero exit code from scheduler'() { + given: + def task = new TaskRun() + def executor = Mock(AwsBatchExecutor) + def jobId = 'job-123' + def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor)) + and: + def job = JobDetail.builder().container(ContainerDetail.builder().exitCode(137).build()) + .status(JobStatus.FAILED) + .statusReason('Task terminated') + .build() + + when: + def result = handler.checkIfCompleted() + then: + + 1 * handler.describeJob(jobId) >> job + 0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code + 1 * executor.getJobOutputStream('job-123') >> null + and: + result == true + handler.status == TaskStatus.COMPLETED + handler.task.exitStatus == 137 + + } + + def 'should check if completed and fallback to exit file when scheduler exit code is null'() { + given: + def task = new TaskRun() + task.name = 'hello' + def jobId = 'job-123' + def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING)) + and: + + def job = JobDetail.builder().container(ContainerDetail.builder().build()) + .status(JobStatus.SUCCEEDED) + .build() + + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.describeJob('job-123') >> job + 1 * handler.readExitFile() >> 0 // Should read exit file as fallback + and: + result == true + handler.status == TaskStatus.COMPLETED + handler.task.exitStatus == 0 + + } + + def 'should check if completed no container exit code neither .exitcode file'() { + given: + def task = new TaskRun() + task.name = 'hello' + def jobId = 'job-123' + def executor = Mock(AwsBatchExecutor) + def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor)) + and: + + def job = JobDetail.builder().container(ContainerDetail.builder().build()) + .status(JobStatus.SUCCEEDED) + .statusReason('Unknown termination') + .build() + + + when: + def result = handler.checkIfCompleted() + then: + 1 * handler.describeJob(jobId) >> job + 1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback + 1 * executor.getJobOutputStream(jobId) >> null + and: + result == true + handler.status == TaskStatus.COMPLETED + handler.task.exitStatus == Integer.MAX_VALUE + handler.task.error.message == 'Unknown termination' + + } } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy index a57b219535..baf7f311e8 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy @@ -16,6 +16,7 @@ package nextflow.cloud.azure.batch import nextflow.exception.ProcessException +import nextflow.util.TestOnly import java.nio.file.Path @@ -114,7 +115,8 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask { if( done ) { // finalize the task final info = batchService.getTask(taskKey).executionInfo - task.exitStatus = info?.exitCode ?: readExitFile() + // Try to get exit code from Azure batch API and fallback to .exitcode + task.exitStatus = info?.exitCode != null ? info.exitCode : readExitFile() task.stdout = outputFile task.stderr = errorFile status = TaskStatus.COMPLETED @@ -205,4 +207,9 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask { return machineInfo } + @TestOnly + protected setTaskKey(AzTaskKey key){ + this.taskKey = key + } + } diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy index d0aadd2324..8397dd2f03 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy @@ -1,5 +1,13 @@ package nextflow.cloud.azure.batch +import com.azure.compute.batch.models.BatchTask +import com.azure.compute.batch.models.BatchTaskExecutionInfo +import com.azure.compute.batch.models.BatchTaskFailureInfo +import com.azure.compute.batch.models.BatchTaskState +import com.azure.compute.batch.models.ErrorCategory +import com.sun.jna.platform.unix.X11 +import nextflow.processor.TaskStatus + import java.nio.file.Path import nextflow.cloud.types.CloudMachineInfo @@ -99,4 +107,155 @@ class AzBatchTaskHandlerTest extends Specification { trace.machineInfo.zone == 'west-eu' trace.machineInfo.priceModel == PriceModel.standard } + + def 'should check if completed with exit code from scheduler'() { + given: + def task = Spy(new TaskRun()){ + getContainer() >> 'ubuntu' + } + task.name = 'foo' + task.workDir = Path.of('/tmp/wdir') + def taskKey = new AzTaskKey('pool-123', 'job-456') + def azTask = new BatchTask() + def execInfo = new BatchTaskExecutionInfo(0,0) + execInfo.exitCode = 0 + azTask.executionInfo = execInfo + azTask.state = BatchTaskState.COMPLETED + + def batchService = Mock(AzBatchService){ + getTask(taskKey) >> azTask + } + def executor = Mock(AzBatchExecutor){ + getBatchService() >> batchService + } + def handler = Spy(new AzBatchTaskHandler(task, executor)){ + deleteTask(_,_) >> null + } + handler.status = TaskStatus.RUNNING + handler.taskKey = taskKey + + when: + def result = handler.checkIfCompleted() + then: + 0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code + and: + result == true + handler.task.exitStatus == 0 + handler.status == TaskStatus.COMPLETED + + } + + def 'should check if completed with non-zero exit code from scheduler'() { + given: + def task = Spy(new TaskRun()){ + getContainer() >> 'ubuntu' + } + task.name = 'foo' + task.workDir = Path.of('/tmp/wdir') + def taskKey = new AzTaskKey('pool-123', 'job-456') + def azTask = new BatchTask() + def execInfo = new BatchTaskExecutionInfo(0,0) + execInfo.exitCode = 137 + azTask.executionInfo = execInfo + azTask.state = BatchTaskState.COMPLETED + + def batchService = Mock(AzBatchService){ + getTask(taskKey) >> azTask + } + def executor = Mock(AzBatchExecutor){ + getBatchService() >> batchService + } + def handler = Spy(new AzBatchTaskHandler(task, executor)){ + deleteTask(_,_) >> null + } + handler.status = TaskStatus.RUNNING + handler.taskKey = taskKey + + when: + def result = handler.checkIfCompleted() + then: + 0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code + and: + result == true + handler.task.exitStatus == 137 + handler.status == TaskStatus.COMPLETED + + + } + + def 'should check if completed and fallback to exit file when scheduler exit code is null'() { + given: + def task = Spy(new TaskRun()){ + getContainer() >> 'ubuntu' + } + task.name = 'foo' + task.workDir = Path.of('/tmp/wdir') + def taskKey = new AzTaskKey('pool-123', 'job-456') + def azTask = new BatchTask() + def execInfo = new BatchTaskExecutionInfo(0,0) + azTask.executionInfo = execInfo + azTask.state = BatchTaskState.COMPLETED + + def batchService = Mock(AzBatchService){ + getTask(taskKey) >> azTask + } + def executor = Mock(AzBatchExecutor){ + getBatchService() >> batchService + } + def handler = Spy(new AzBatchTaskHandler(task, executor)){ + deleteTask(_,_) >> null + } + handler.status = TaskStatus.RUNNING + handler.taskKey = taskKey + + when: + def result = handler.checkIfCompleted() + + then: + 1 * handler.readExitFile() >> 0 // Should read exit file as fallback + and: + result == true + handler.task.exitStatus == 0 + handler.status == TaskStatus.COMPLETED + } + + def 'should check if completed and no scheduler exit code neither .exitcode file'() { + given: + def task = Spy(new TaskRun()){ + getContainer() >> 'ubuntu' + } + task.name = 'foo' + task.workDir = Path.of('/tmp/wdir') + def taskKey = new AzTaskKey('pool-123', 'job-456') + def azTask = new BatchTask() + def execInfo = new BatchTaskExecutionInfo(0,0) + def failureInfo = new BatchTaskFailureInfo(ErrorCategory.USER_ERROR) + failureInfo.message = 'Unknown error' + execInfo.failureInfo = failureInfo + azTask.executionInfo = execInfo + azTask.state = BatchTaskState.COMPLETED + + def batchService = Mock(AzBatchService){ + getTask(taskKey) >> azTask + } + def executor = Mock(AzBatchExecutor){ + getBatchService() >> batchService + } + def handler = Spy(new AzBatchTaskHandler(task, executor)){ + deleteTask(_,_) >> null + } + handler.status = TaskStatus.RUNNING + handler.taskKey = taskKey + + when: + def result = handler.checkIfCompleted() + + then: + 1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback + and: + result == true + handler.task.exitStatus == Integer.MAX_VALUE + handler.status == TaskStatus.COMPLETED + handler.task.error.message == 'Unknown error' + } } diff --git a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy index e98da988cb..73e16fc407 100644 --- a/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy +++ b/plugins/nf-k8s/src/main/nextflow/k8s/K8sTaskHandler.groovy @@ -425,7 +425,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { } else { // finalize the task - // read the exit code from the K8s container terminated state, if 0 (successful) or missing + // read the exit code from the K8s container terminated state, if missing // take the exit code from the `.exitcode` file created by nextflow // the rationale is that in case of error (e.g. OOMKilled, pod eviction), the exit code from // the K8s API is more reliable because the container may terminate before the exit file is written @@ -433,7 +433,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#containerstateterminated-v1-core log.trace("[k8s] Container Terminated state ${state.terminated}") final k8sExitCode = (state.terminated as Map)?.exitCode as Integer - task.exitStatus = k8sExitCode ?: readExitFile() + task.exitStatus = k8sExitCode != null ? k8sExitCode : readExitFile() task.stdout = outputFile task.stderr = errorFile } diff --git a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy index 2f780ceb67..3e7ce893cd 100644 --- a/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/plugins/nf-k8s/src/test/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -474,6 +474,10 @@ class K8sTaskHandlerTest extends Specification { finishedAt: "2018-01-13T10:19:36Z", exitCode: 0 ] def fullState = [terminated: termState] + def noExitCodeTermState = [ reason: "Completed", + startedAt: "2018-01-13T10:09:36Z", + finishedAt: "2018-01-13T10:19:36Z" ] + def noExitCodeState = [terminated: noExitCodeTermState] and: def handler = Spy(new K8sTaskHandler(task: task, client:client, podName: POD_NAME, outputFile: OUT_FILE, errorFile: ERR_FILE)) @@ -496,6 +500,21 @@ class K8sTaskHandlerTest extends Specification { then: 1 * handler.getState() >> fullState 1 * handler.updateTimestamps(termState) + 1 * handler.deletePodIfSuccessful(task) >> null + 1 * handler.savePodLogOnError(task) >> null + handler.task.exitStatus == 0 + handler.task.@stdout == OUT_FILE + handler.task.@stderr == ERR_FILE + handler.status == TaskStatus.COMPLETED + handler.startTimeMillis == 1515838176000 + handler.completeTimeMillis == 1515838776000 + result == true + + when: + result = handler.checkIfCompleted() + then: + 1 * handler.getState() >> noExitCodeState + 1 * handler.updateTimestamps(noExitCodeTermState) 1 * handler.readExitFile() >> EXIT_STATUS 1 * handler.deletePodIfSuccessful(task) >> null 1 * handler.savePodLogOnError(task) >> null From 51597f1792b0d6ee6c6d4fddff79d4fe0fe916e3 Mon Sep 17 00:00:00 2001 From: jorgee Date: Wed, 15 Oct 2025 12:39:23 +0200 Subject: [PATCH 3/4] Update 500xx exit code management and remove comment Signed-off-by: jorgee --- .../cloud/google/batch/GoogleBatchTaskHandler.groovy | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 01ba07e1ac..5aff01ad56 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -547,7 +547,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { // finalize the task task.exitStatus = getExitCode() if( state == 'FAILED' ) { - if( task.exitStatus == Integer.MAX_VALUE ) + // When no exit code or 500XX codes, get the jobError reason from events + if( task.exitStatus == Integer.MAX_VALUE || task.exitStatus >= 50000) task.error = getJobError() task.stdout = executor.logging.stdout(uid, taskId) ?: outputFile task.stderr = executor.logging.stderr(uid, taskId) ?: errorFile @@ -575,12 +576,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { private Integer getExitCode(){ final events = client.getTaskStatus(jobId, taskId)?.getStatusEventsList() if( events ) { - log.debug("[GOOGLE BATCH] Getting exit code from events: $events") final batchExitCode = events.stream().filter(ev -> ev.hasTaskExecution()) .max( (ev1, ev2) -> Long.compare(ev1.getEventTime().seconds, ev2.getEventTime().seconds) ) .map(ev -> ev.getTaskExecution().getExitCode()) .orElse(null) - if( batchExitCode != null && batchExitCode < 50000) // Ignore 500XX codes, they will be managed later. + if( batchExitCode != null ) return batchExitCode } // fallback to read @@ -597,7 +597,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}" final error = lastEvent?.description - if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) || lastEvent?.taskExecution?.exitCode > 50000) { + if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find())) { return new ProcessException(error) } } From a2114e945f5534766a22659d22fd7fd7783005b3 Mon Sep 17 00:00:00 2001 From: jorgee Date: Wed, 15 Oct 2025 14:33:31 +0200 Subject: [PATCH 4/4] update tests with latests changes Signed-off-by: jorgee --- .../batch/GoogleBatchTaskHandlerTest.groovy | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 5918937fb1..7c8dac3fb2 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -731,14 +731,44 @@ class GoogleBatchTaskHandlerTest extends Specification { handler.task.error?.message == TASK_ERROR result == RESULT + where: + TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR + TaskStatus.State.FAILED | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null + null | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null + TaskStatus.State.FAILED | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null + null | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null + } + + def 'should check if completed when 500xx errors' () { + given: + def jobId = '1' + def taskId = '1' + def client = Mock(BatchClient){ + getTaskInArrayStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null } + getTaskStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null } + getJobStatus(jobId ) >> makeJobStatus(JobStatus.State.FAILED,DESC) + } + def logging = Mock(BatchLogging) + def executor = Mock(GoogleBatchExecutor){ + getLogging() >> logging + } + def task = new TaskRun() + task.name = 'hello' + def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor)) + when: + def result = handler.checkIfCompleted() + then: + 0 * handler.readExitFile() >> EXIT_STATUS + handler.status == TASK_STATUS + handler.task.exitStatus == EXIT_STATUS + handler.task.error?.message == TASK_ERROR + result == RESULT + where: TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR - TaskStatus.State.FAILED | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null - null | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null - TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | true | nextflow.processor.TaskStatus.COMPLETED | Integer.MAX_VALUE | true | 'Task failed due to Spot VM preemption with exit code 50001.' - TaskStatus.State.FAILED | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null - null | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null - TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus.COMPLETED | Integer.MAX_VALUE | true | 'Task failed due to Spot VM preemption with exit code 50001.' + TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | true | nextflow.processor.TaskStatus.COMPLETED | 50001 | true | 'Task failed due to Spot VM preemption with exit code 50001.' + TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus.COMPLETED | 50001 | true | 'Task failed due to Spot VM preemption with exit code 50001.' } + }