Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,11 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final job = describeJob(jobId)
final done = job?.status() in [JobStatus.SUCCEEDED, JobStatus.FAILED]
if( done ) {
// take the exit code of the container, if 0 (successful) or missing
// take the exit code of the container, if missing (null)
// take the exit code from the `.exitcode` file create by nextflow
// the rationale of this is that, in case of error, the exit code return
// by the batch API is more reliable.
task.exitStatus = job.container().exitCode() ?: readExitFile()
task.exitStatus = job.container()?.exitCode() != null ? job.container().exitCode() : readExitFile()
// finalize the task
task.stdout = outputFile
if( job?.status() == JobStatus.FAILED || task.exitStatus==Integer.MAX_VALUE ) {
Expand All @@ -326,7 +326,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
return false
}

private int readExitFile() {
protected int readExitFile() {
try {
exitFile.text as Integer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.cloud.aws.batch

import software.amazon.awssdk.services.batch.model.JobStatus

import java.nio.file.Path
import java.time.Instant

Expand Down Expand Up @@ -43,6 +45,7 @@ import nextflow.script.ProcessConfig
import nextflow.util.CacheHelper
import nextflow.util.MemoryUnit
import software.amazon.awssdk.services.batch.BatchClient
import software.amazon.awssdk.services.batch.model.ContainerDetail
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsRequest
import software.amazon.awssdk.services.batch.model.DescribeJobDefinitionsResponse
import software.amazon.awssdk.services.batch.model.DescribeJobsRequest
Expand Down Expand Up @@ -1134,8 +1137,110 @@ class AwsBatchTaskHandlerTest extends Specification {
2 | true | false | 2
and:
null | true | true | 5 // <-- default to 5
0 | true | true | 5 // <-- default to 5
0 | true | true | 5 // <-- default to 5
1 | true | true | 1
2 | true | true | 2
}

def 'should check if completed with exit code from scheduler'() {
given:
def task = new TaskRun()
def jobId = 'job-123'
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING))
and:

def job = JobDetail.builder().container(ContainerDetail.builder()
.exitCode(0).build()).status(JobStatus.SUCCEEDED)
.build()

when:
def result = handler.checkIfCompleted()
then:
1 * handler.describeJob('job-123') >> job
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
and:
result == true
handler.status == TaskStatus.COMPLETED
handler.task.exitStatus == 0
}

def 'should check if completed with non-zero exit code from scheduler'() {
given:
def task = new TaskRun()
def executor = Mock(AwsBatchExecutor)
def jobId = 'job-123'
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor))
and:
def job = JobDetail.builder().container(ContainerDetail.builder().exitCode(137).build())
.status(JobStatus.FAILED)
.statusReason('Task terminated')
.build()

when:
def result = handler.checkIfCompleted()
then:

1 * handler.describeJob(jobId) >> job
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
1 * executor.getJobOutputStream('job-123') >> null
and:
result == true
handler.status == TaskStatus.COMPLETED
handler.task.exitStatus == 137

}

def 'should check if completed and fallback to exit file when scheduler exit code is null'() {
given:
def task = new TaskRun()
task.name = 'hello'
def jobId = 'job-123'
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING))
and:

def job = JobDetail.builder().container(ContainerDetail.builder().build())
.status(JobStatus.SUCCEEDED)
.build()


when:
def result = handler.checkIfCompleted()
then:
1 * handler.describeJob('job-123') >> job
1 * handler.readExitFile() >> 0 // Should read exit file as fallback
and:
result == true
handler.status == TaskStatus.COMPLETED
handler.task.exitStatus == 0

}

def 'should check if completed no container exit code neither .exitcode file'() {
given:
def task = new TaskRun()
task.name = 'hello'
def jobId = 'job-123'
def executor = Mock(AwsBatchExecutor)
def handler = Spy(new AwsBatchTaskHandler(task: task, jobId: jobId, status: TaskStatus.RUNNING, executor: executor))
and:

def job = JobDetail.builder().container(ContainerDetail.builder().build())
.status(JobStatus.SUCCEEDED)
.statusReason('Unknown termination')
.build()


when:
def result = handler.checkIfCompleted()
then:
1 * handler.describeJob(jobId) >> job
1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback
1 * executor.getJobOutputStream(jobId) >> null
and:
result == true
handler.status == TaskStatus.COMPLETED
handler.task.exitStatus == Integer.MAX_VALUE
handler.task.error.message == 'Unknown termination'

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package nextflow.cloud.azure.batch

import nextflow.exception.ProcessException
import nextflow.util.TestOnly

import java.nio.file.Path

Expand Down Expand Up @@ -114,7 +115,8 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
if( done ) {
// finalize the task
final info = batchService.getTask(taskKey).executionInfo
task.exitStatus = info?.exitCode ?: readExitFile()
// Try to get exit code from Azure batch API and fallback to .exitcode
task.exitStatus = info?.exitCode != null ? info.exitCode : readExitFile()
task.stdout = outputFile
task.stderr = errorFile
status = TaskStatus.COMPLETED
Expand Down Expand Up @@ -205,4 +207,9 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return machineInfo
}

@TestOnly
protected setTaskKey(AzTaskKey key){
this.taskKey = key
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package nextflow.cloud.azure.batch

import com.azure.compute.batch.models.BatchTask
import com.azure.compute.batch.models.BatchTaskExecutionInfo
import com.azure.compute.batch.models.BatchTaskFailureInfo
import com.azure.compute.batch.models.BatchTaskState
import com.azure.compute.batch.models.ErrorCategory
import com.sun.jna.platform.unix.X11
import nextflow.processor.TaskStatus

import java.nio.file.Path

import nextflow.cloud.types.CloudMachineInfo
Expand Down Expand Up @@ -99,4 +107,155 @@ class AzBatchTaskHandlerTest extends Specification {
trace.machineInfo.zone == 'west-eu'
trace.machineInfo.priceModel == PriceModel.standard
}

def 'should check if completed with exit code from scheduler'() {
given:
def task = Spy(new TaskRun()){
getContainer() >> 'ubuntu'
}
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
execInfo.exitCode = 0
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
}
def executor = Mock(AzBatchExecutor){
getBatchService() >> batchService
}
def handler = Spy(new AzBatchTaskHandler(task, executor)){
deleteTask(_,_) >> null
}
handler.status = TaskStatus.RUNNING
handler.taskKey = taskKey

when:
def result = handler.checkIfCompleted()
then:
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
and:
result == true
handler.task.exitStatus == 0
handler.status == TaskStatus.COMPLETED

}

def 'should check if completed with non-zero exit code from scheduler'() {
given:
def task = Spy(new TaskRun()){
getContainer() >> 'ubuntu'
}
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
execInfo.exitCode = 137
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
}
def executor = Mock(AzBatchExecutor){
getBatchService() >> batchService
}
def handler = Spy(new AzBatchTaskHandler(task, executor)){
deleteTask(_,_) >> null
}
handler.status = TaskStatus.RUNNING
handler.taskKey = taskKey

when:
def result = handler.checkIfCompleted()
then:
0 * handler.readExitFile() // Should NOT read exit file when scheduler provides exit code
and:
result == true
handler.task.exitStatus == 137
handler.status == TaskStatus.COMPLETED


}

def 'should check if completed and fallback to exit file when scheduler exit code is null'() {
given:
def task = Spy(new TaskRun()){
getContainer() >> 'ubuntu'
}
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
}
def executor = Mock(AzBatchExecutor){
getBatchService() >> batchService
}
def handler = Spy(new AzBatchTaskHandler(task, executor)){
deleteTask(_,_) >> null
}
handler.status = TaskStatus.RUNNING
handler.taskKey = taskKey

when:
def result = handler.checkIfCompleted()

then:
1 * handler.readExitFile() >> 0 // Should read exit file as fallback
and:
result == true
handler.task.exitStatus == 0
handler.status == TaskStatus.COMPLETED
}

def 'should check if completed and no scheduler exit code neither .exitcode file'() {
given:
def task = Spy(new TaskRun()){
getContainer() >> 'ubuntu'
}
task.name = 'foo'
task.workDir = Path.of('/tmp/wdir')
def taskKey = new AzTaskKey('pool-123', 'job-456')
def azTask = new BatchTask()
def execInfo = new BatchTaskExecutionInfo(0,0)
def failureInfo = new BatchTaskFailureInfo(ErrorCategory.USER_ERROR)
failureInfo.message = 'Unknown error'
execInfo.failureInfo = failureInfo
azTask.executionInfo = execInfo
azTask.state = BatchTaskState.COMPLETED

def batchService = Mock(AzBatchService){
getTask(taskKey) >> azTask
}
def executor = Mock(AzBatchExecutor){
getBatchService() >> batchService
}
def handler = Spy(new AzBatchTaskHandler(task, executor)){
deleteTask(_,_) >> null
}
handler.status = TaskStatus.RUNNING
handler.taskKey = taskKey

when:
def result = handler.checkIfCompleted()

then:
1 * handler.readExitFile() >> Integer.MAX_VALUE // Should read exit file as fallback
and:
result == true
handler.task.exitStatus == Integer.MAX_VALUE
handler.status == TaskStatus.COMPLETED
handler.task.error.message == 'Unknown error'
}
}
Loading