Skip to content

Commit

Permalink
Merge pull request #52 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.0.11
  • Loading branch information
civitaspo authored Jan 24, 2019
2 parents 2399181 + 782e20d commit 2d394b0
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 25 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
0.0.11 (2019-01-24)
===================
* [Enhancement] `ecs_task.wait>` operator supports changeable interval and exponential backoff storategy. @Mulyu++

0.0.10 (2018-12-26)
===================
* [Enhancement] Shorten the family name with MurmurHash3 if auto-generated family name exceeds 255 letters.
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _export:
repositories:
- https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-ecs_task:0.0.10
- pro.civitaspo:digdag-operator-ecs_task:0.0.11
ecs_task:
auth_method: profile
tmp_storage:
Expand Down Expand Up @@ -210,7 +210,11 @@ In addition, the below configurations exist.
- **condition**: The condition of tasks to wait. Available values are `"all"` or `"any"`. (string, default: `"all"`)
- **status**: The status of tasks to wait. Available values are `"PENDING"`, `"RUNNING"`, or `"STOPPED"` (string, default: `"STOPPED"`)
- **ignore_failure**: Ignore even if any tasks exit with any status. This option is true, then the behaviour includes one of when **ignore_exit_code** is `true`. (boolean, default: `false`)
- **ignore_exit_code**: Ignore even if any tasks exit with any exit code. When the containers of the task include one that does not have exit code, it is not ignored even if this option is `true`. (boolean, default: `false`)
- **ignore_exit_code**: Ignore even if any tasks exit with any exit code. When the containers of the task include one that does not have exit code, it is not ignored even if this option is `true`. (boolean, default: `false`)
- **polling_strategy**: The polling strategy settings of wait.
- **interval_type**: The interval type of wait. Available values are `"constant"` or `"exponential"`. (string, default: `"constant"`)
- **limit**: Max number of polling try. (integer, optional)
- **interval**: Delay interval of wait. The time unit is seconds. (integer, default: `1`)

## Configuration for `ecs_task.result>` operator

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.10'
version = '0.0.11'

def digdagVersion = '0.9.31'
def scalaSemanticVersion = "2.12.6"
Expand Down
13 changes: 12 additions & 1 deletion example/ecs_task.sh/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
message:
message: 'hello ecs_task.sh'
created_by: civitaspo

+step1:
+exceeds-255-letters:
+dummy-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:
Expand All @@ -21,5 +21,16 @@
message:
message: 'hello ecs_task.sh'
created_by: civitaspo
+step2:
ecs_task.sh>: sleep 15
image: civitaspo/digdag-awscli:latest
_export:
ecs_task:
wait:
polling_strategy:
interval_type: exponential
limit: 4
interval: 2



2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-ecs_task:0.0.10
- pro.civitaspo:digdag-operator-ecs_task:0.0.11
ecs_task:
auth_method: profile
tmp_storage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package pro.civitaspo.digdag.plugin

package object ecs_task {

val VERSION: String = "0.0.10"
val VERSION: String = "0.0.11"

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ class EcsTaskWaitOperator(operatorName: String, context: OperatorContext, system
val status: String = params.get("status", classOf[String], "STOPPED")
val ignoreFailure: Boolean = params.get("ignore_failure", classOf[Boolean], false)
val ignoreExitCode: Boolean = params.get("ignore_exit_code", classOf[Boolean], false)
val pollingStrategy: Config = params.getNestedOrGetEmpty("polling_strategy")

override def runTask(): TaskResult = {
val req: DescribeTasksRequest = new DescribeTasksRequest()
.withCluster(cluster)
.withTasks(tasks: _*)

aws.withEcs { ecs =>
val waiter: EcsTaskWaiter = EcsTaskWaiter(logger = logger, ecs = ecs, timeout = timeout, condition = condition, status = status)
val waiter: EcsTaskWaiter =
EcsTaskWaiter(logger = logger, ecs = ecs, timeout = timeout, condition = condition, status = status, pollingStrategy = pollingStrategy)
try {
waiter.wait(req)
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
package pro.civitaspo.digdag.plugin.ecs_task.wait
import java.util.concurrent.{Executors, ExecutorService}
import java.util.concurrent.{ExecutorService, Executors}

import com.amazonaws.services.ecs.AmazonECS
import com.amazonaws.services.ecs.model.{DescribeTasksRequest, DescribeTasksResult}
import com.amazonaws.services.ecs.waiters.DescribeTasksFunction
import com.amazonaws.waiters.{
FixedDelayStrategy,
MaxAttemptsRetryStrategy,
PollingStrategy,
Waiter,
WaiterAcceptor,
WaiterBuilder,
WaiterParameters,
WaiterState,
WaiterTimedOutException
}
import io.digdag.client.config.ConfigException
import com.amazonaws.waiters.PollingStrategy.DelayStrategy
import com.amazonaws.waiters._
import io.digdag.client.config.{Config, ConfigException}
import io.digdag.util.DurationParam
import org.slf4j.Logger

Expand All @@ -27,9 +18,38 @@ case class EcsTaskWaiter(
executorService: ExecutorService = Executors.newFixedThreadPool(50),
timeout: DurationParam,
condition: String,
status: String
status: String,
pollingStrategy: Config
) {

sealed trait IntervalType {
def value: String = toString
}

object IntervalType {
case object constant extends IntervalType
case object exponential extends IntervalType
private val values = Seq(constant, exponential)

def from(value: String): IntervalType =
values.find(_.value == value).getOrElse {
val message: String = s"""interval_type: \"$value\" is not supported. Available `interval_type`s are \"constant\", \"exponential\"."""
throw new ConfigException(message)
}
}

val limit: Int = pollingStrategy.get("limit", classOf[Int], Int.MaxValue)
val interval: Int = pollingStrategy.get("interval", classOf[Int], 1)
val intervalType: String = pollingStrategy.get("interval_type", classOf[String], IntervalType.constant.value)

private def delayStrategy: DelayStrategy =
IntervalType.from(intervalType) match {
case IntervalType.constant =>
new FixedDelayStrategy(interval)
case IntervalType.exponential =>
new ExponentialBackoffDelayStrategy(interval)
}

def wait(req: DescribeTasksRequest): Unit = {
newWaiter().run(new WaiterParameters[DescribeTasksRequest]().withRequest(req))
}
Expand Down Expand Up @@ -71,10 +91,7 @@ case class EcsTaskWaiter(
}

private def newPollingStrategy(): PollingStrategy = {
new PollingStrategy(
new MaxAttemptsRetryStrategy(Int.MaxValue),
new FixedDelayStrategy(1) // seconds
)
new PollingStrategy(new MaxAttemptsRetryStrategy(limit), delayStrategy)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package pro.civitaspo.digdag.plugin.ecs_task.wait

import com.amazonaws.waiters.PollingStrategy.DelayStrategy
import com.amazonaws.waiters.PollingStrategyContext

class ExponentialBackoffDelayStrategy(interval: Int) extends DelayStrategy {

override def delayBeforeNextRetry(pollingStrategyContext: PollingStrategyContext): Unit = {
val nextDurationSec =
interval * Math.pow(2, pollingStrategyContext.getRetriesAttempted)

Thread.sleep(nextDurationSec.toLong * 1000)
}
}

0 comments on commit 2d394b0

Please sign in to comment.