diff --git a/CHANGELOG.md b/CHANGELOG.md index f84ec18..7a285cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +0.0.11 (2019-01-24) +=================== +* [Enhancement] `ecs_task.wait>` operator supports changeable interval and exponential backoff storategy. @Mulyu++ + 0.0.10 (2018-12-26) =================== * [Enhancement] Shorten the family name with MurmurHash3 if auto-generated family name exceeds 255 letters. diff --git a/README.md b/README.md index ddf5ef0..c7eb099 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.10 + - pro.civitaspo:digdag-operator-ecs_task:0.0.11 ecs_task: auth_method: profile tmp_storage: @@ -210,7 +210,11 @@ In addition, the below configurations exist. - **condition**: The condition of tasks to wait. Available values are `"all"` or `"any"`. (string, default: `"all"`) - **status**: The status of tasks to wait. Available values are `"PENDING"`, `"RUNNING"`, or `"STOPPED"` (string, default: `"STOPPED"`) - **ignore_failure**: Ignore even if any tasks exit with any status. This option is true, then the behaviour includes one of when **ignore_exit_code** is `true`. (boolean, default: `false`) -- **ignore_exit_code**: Ignore even if any tasks exit with any exit code. When the containers of the task include one that does not have exit code, it is not ignored even if this option is `true`. (boolean, default: `false`) +- **ignore_exit_code**: Ignore even if any tasks exit with any exit code. When the containers of the task include one that does not have exit code, it is not ignored even if this option is `true`. (boolean, default: `false`) +- **polling_strategy**: The polling strategy settings of wait. + - **interval_type**: The interval type of wait. Available values are `"constant"` or `"exponential"`. (string, default: `"constant"`) + - **limit**: Max number of polling try. (integer, optional) + - **interval**: Delay interval of wait. The time unit is seconds. (integer, default: `1`) ## Configuration for `ecs_task.result>` operator diff --git a/build.gradle b/build.gradle index 5c4bf99..a62ac09 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.10' +version = '0.0.11' def digdagVersion = '0.9.31' def scalaSemanticVersion = "2.12.6" diff --git a/example/ecs_task.sh/example.dig b/example/ecs_task.sh/example.dig index 0624b44..7d232f7 100644 --- a/example/ecs_task.sh/example.dig +++ b/example/ecs_task.sh/example.dig @@ -7,7 +7,7 @@ message: message: 'hello ecs_task.sh' created_by: civitaspo - + +step1: +exceeds-255-letters: +dummy-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx: @@ -21,5 +21,16 @@ message: message: 'hello ecs_task.sh' created_by: civitaspo + +step2: + ecs_task.sh>: sleep 15 + image: civitaspo/digdag-awscli:latest + _export: + ecs_task: + wait: + polling_strategy: + interval_type: exponential + limit: 4 + interval: 2 + diff --git a/example/example.dig b/example/example.dig index b4c67a7..3d6b616 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.10 + - pro.civitaspo:digdag-operator-ecs_task:0.0.11 ecs_task: auth_method: profile tmp_storage: diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala index a3745a0..ff1c5c5 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/package.scala @@ -2,6 +2,6 @@ package pro.civitaspo.digdag.plugin package object ecs_task { - val VERSION: String = "0.0.10" + val VERSION: String = "0.0.11" } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaitOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaitOperator.scala index 67bdefd..1951298 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaitOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaitOperator.scala @@ -18,6 +18,7 @@ class EcsTaskWaitOperator(operatorName: String, context: OperatorContext, system val status: String = params.get("status", classOf[String], "STOPPED") val ignoreFailure: Boolean = params.get("ignore_failure", classOf[Boolean], false) val ignoreExitCode: Boolean = params.get("ignore_exit_code", classOf[Boolean], false) + val pollingStrategy: Config = params.getNestedOrGetEmpty("polling_strategy") override def runTask(): TaskResult = { val req: DescribeTasksRequest = new DescribeTasksRequest() @@ -25,7 +26,8 @@ class EcsTaskWaitOperator(operatorName: String, context: OperatorContext, system .withTasks(tasks: _*) aws.withEcs { ecs => - val waiter: EcsTaskWaiter = EcsTaskWaiter(logger = logger, ecs = ecs, timeout = timeout, condition = condition, status = status) + val waiter: EcsTaskWaiter = + EcsTaskWaiter(logger = logger, ecs = ecs, timeout = timeout, condition = condition, status = status, pollingStrategy = pollingStrategy) try { waiter.wait(req) } catch { diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaiter.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaiter.scala index 3cca6ef..40cf380 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaiter.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/EcsTaskWaiter.scala @@ -1,21 +1,12 @@ package pro.civitaspo.digdag.plugin.ecs_task.wait -import java.util.concurrent.{Executors, ExecutorService} +import java.util.concurrent.{ExecutorService, Executors} import com.amazonaws.services.ecs.AmazonECS import com.amazonaws.services.ecs.model.{DescribeTasksRequest, DescribeTasksResult} import com.amazonaws.services.ecs.waiters.DescribeTasksFunction -import com.amazonaws.waiters.{ - FixedDelayStrategy, - MaxAttemptsRetryStrategy, - PollingStrategy, - Waiter, - WaiterAcceptor, - WaiterBuilder, - WaiterParameters, - WaiterState, - WaiterTimedOutException -} -import io.digdag.client.config.ConfigException +import com.amazonaws.waiters.PollingStrategy.DelayStrategy +import com.amazonaws.waiters._ +import io.digdag.client.config.{Config, ConfigException} import io.digdag.util.DurationParam import org.slf4j.Logger @@ -27,9 +18,38 @@ case class EcsTaskWaiter( executorService: ExecutorService = Executors.newFixedThreadPool(50), timeout: DurationParam, condition: String, - status: String + status: String, + pollingStrategy: Config ) { + sealed trait IntervalType { + def value: String = toString + } + + object IntervalType { + case object constant extends IntervalType + case object exponential extends IntervalType + private val values = Seq(constant, exponential) + + def from(value: String): IntervalType = + values.find(_.value == value).getOrElse { + val message: String = s"""interval_type: \"$value\" is not supported. Available `interval_type`s are \"constant\", \"exponential\".""" + throw new ConfigException(message) + } + } + + val limit: Int = pollingStrategy.get("limit", classOf[Int], Int.MaxValue) + val interval: Int = pollingStrategy.get("interval", classOf[Int], 1) + val intervalType: String = pollingStrategy.get("interval_type", classOf[String], IntervalType.constant.value) + + private def delayStrategy: DelayStrategy = + IntervalType.from(intervalType) match { + case IntervalType.constant => + new FixedDelayStrategy(interval) + case IntervalType.exponential => + new ExponentialBackoffDelayStrategy(interval) + } + def wait(req: DescribeTasksRequest): Unit = { newWaiter().run(new WaiterParameters[DescribeTasksRequest]().withRequest(req)) } @@ -71,10 +91,7 @@ case class EcsTaskWaiter( } private def newPollingStrategy(): PollingStrategy = { - new PollingStrategy( - new MaxAttemptsRetryStrategy(Int.MaxValue), - new FixedDelayStrategy(1) // seconds - ) + new PollingStrategy(new MaxAttemptsRetryStrategy(limit), delayStrategy) } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/ExponentialBackoffDelayStrategy.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/ExponentialBackoffDelayStrategy.scala new file mode 100644 index 0000000..d7a80f1 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/wait/ExponentialBackoffDelayStrategy.scala @@ -0,0 +1,14 @@ +package pro.civitaspo.digdag.plugin.ecs_task.wait + +import com.amazonaws.waiters.PollingStrategy.DelayStrategy +import com.amazonaws.waiters.PollingStrategyContext + +class ExponentialBackoffDelayStrategy(interval: Int) extends DelayStrategy { + + override def delayBeforeNextRetry(pollingStrategyContext: PollingStrategyContext): Unit = { + val nextDurationSec = + interval * Math.pow(2, pollingStrategyContext.getRetriesAttempted) + + Thread.sleep(nextDurationSec.toLong * 1000) + } +}