Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class FusionConfig implements ConfigScope {

final static public String DEFAULT_TAGS = "[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true)"

final static public int DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS = 5

final static public String FUSION_PATH = '/usr/bin/fusion'

final static private String PRODUCT_NAME = 'fusion'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import nextflow.exception.ProcessSubmitException
import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.BashWrapperBuilder
import nextflow.fusion.FusionAwareTask
import nextflow.fusion.FusionConfig
import nextflow.processor.BatchContext
import nextflow.processor.BatchHandler
import nextflow.processor.TaskArrayRun
Expand Down Expand Up @@ -753,7 +754,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
return result
// when fusion snapshot is enabled max attempt should be > 0
// to enable to allow snapshot retry the job execution in a new ec2 instance
return fusionEnabled() && fusionConfig().snapshotsEnabled() ? 5 : 0
return fusionEnabled() && fusionConfig().snapshotsEnabled() ? FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS : 0
}

protected String getJobName(TaskRun task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import nextflow.exception.ProcessUnrecoverableException
import nextflow.executor.BashWrapperBuilder
import nextflow.executor.res.DiskResource
import nextflow.fusion.FusionAwareTask
import nextflow.fusion.FusionConfig
import nextflow.fusion.FusionScriptLauncher
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskConfig
Expand Down Expand Up @@ -262,12 +263,13 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
.addAllVolumes( launcher.getVolumes() )

// retry on spot reclaim
if( batchConfig.maxSpotAttempts ) {
final attempts = maxSpotAttempts()
if( attempts > 0 ) {
// Note: Google Batch uses the special exit status 50001 to signal
// the execution was terminated due a spot reclaim. When this happens
// The policy re-execute the jobs automatically up to `maxSpotAttempts` times
taskSpec
.setMaxRetryCount( batchConfig.maxSpotAttempts )
.setMaxRetryCount( attempts )
.addLifecyclePolicies(
LifecyclePolicy.newBuilder()
.setActionCondition(
Expand Down Expand Up @@ -468,6 +470,15 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}
}

protected int maxSpotAttempts() {
final result = batchConfig.maxSpotAttempts
if( result > 0 )
return result
// when fusion snapshot is enabled max attempt should be > 0
// to enable to allow snapshot retry the job execution in a new compute instance
return fusionEnabled() && fusionConfig().snapshotsEnabled() ? FusionConfig.DEFAULT_SNAPSHOT_MAX_SPOT_ATTEMPTS : 0
}

/**
* @return Retrieve the submitted task state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.google.cloud.batch.v1.StatusEvent
import com.google.cloud.batch.v1.TaskStatus
import com.google.cloud.batch.v1.Volume
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem
import nextflow.Global
import nextflow.Session
import nextflow.SysEnv
import nextflow.cloud.google.batch.client.BatchClient
Expand Down Expand Up @@ -773,5 +774,47 @@ class GoogleBatchTaskHandlerTest extends Specification {
TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus.COMPLETED | 50001 | true | 'Task failed due to Spot VM preemption with exit code 50001.'
}

def 'should validate max spot attempts' () {
given:
Global.config = [fusion: [enabled: FUSION, snapshots: SNAPSHOTS]]
def workDir = Path.of('/work/dir')
def client = Mock(BatchClient)
def batchConfig = Mock(BatchConfig) { getMaxSpotAttempts() >> ATTEMPTS }
and:
def executor = Mock(GoogleBatchExecutor) {
getBatchConfig() >> batchConfig
getClient() >> client
isFusionEnabled() >> FUSION
}
def processor = Mock(TaskProcessor) { getExecutor() >> executor }
def task = Mock(TaskRun) {
getHashLog() >> '1234567890'
getWorkDir() >> workDir
getProcessor() >> processor
}
def handler = Spy(new GoogleBatchTaskHandler(task, executor)) {
fusionEnabled() >> FUSION
}

expect:
handler.maxSpotAttempts() == EXPECTED

cleanup:
Global.config = null

where:
ATTEMPTS | FUSION | SNAPSHOTS | EXPECTED
0 | false | false | 0
1 | false | false | 1
2 | false | false | 2
and:
0 | true | false | 0
1 | true | false | 1
2 | true | false | 2
and:
0 | true | true | 5 // <-- default to 5
1 | true | true | 1
2 | true | true | 2
}

}
Loading