Skip to content

Commit

Permalink
Merge pull request #21 from civitaspo/develop
Browse files Browse the repository at this point in the history
v0.0.4
  • Loading branch information
civitaspo authored Nov 5, 2018
2 parents abdeb51 + 3c669e7 commit 9746db2
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 58 deletions.
11 changes: 9 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
0.0.4 (2018-11-06)
==================

* [Experimental] Implement `ecs_task.embulk>` operator.
* [Enhancement] Write README for scripting operators.
* [Enhancement] Make family name more configuable for scripting operators.

0.0.3 (2018-10-30)
======================
==================

* [Breaking Change] Do not use enum parameter directory because the enums require upper camel case ( `ecs_task.{py,register,run}>` operator)
* [Enhancement] Rename the configuration key: `additional_containers` to `sidecars` ( `ecs_task.py>` operator)
Expand All @@ -16,7 +23,7 @@
0.0.2 (2018-10-29)
==================

* [Experimental] Implement ecs_task.py> operator. (No document yet)
* [Experimental] Implement `ecs_task.py>` operator. (No document yet)
* [Fix] Stop correctly after task run to shutdown TransferManager after processing.

0.0.1 (2018-10-23)
Expand Down
87 changes: 86 additions & 1 deletion README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'pro.civitaspo'
version = '0.0.3'
version = '0.0.4'

def digdagVersion = '0.9.31'
def scalaSemanticVersion = "2.12.6"
Expand Down
2 changes: 1 addition & 1 deletion example/example.dig
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ _export:
- file://${repos}
# - https://jitpack.io
dependencies:
- pro.civitaspo:digdag-operator-ecs_task:0.0.3
- pro.civitaspo:digdag-operator-ecs_task:0.0.4
ecs_task:
auth_method: profile

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
set -ex
set -o pipefail

mkdir -p ./digdag-operator-ecs_task
cd digdag-operator-ecs_task

# Create output files
touch out.json stdout.log stderr.log

# Download requirements
aws s3 cp s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/ ./ --recursive

# Move workspace
cd workspace

# Unset e option for returning embulk results to digdag
set +e

# Run setup command
${ECS_TASK_EMBULK_SETUP_COMMAND} \
2>> ../stderr.log \
| tee -a ../stdout.log

# Run
embulk run ../config.yml \
2>> ../stderr.log \
| tee -a ../stdout.log

# Capture exit code
EXIT_CODE=$?

# Set e option
set -e

# Move out workspace
cd ..

# For logging driver
cat stderr.log 1>&2

# Write out.json
cat <<EOF > out.json
{
"subtask_config": {},
"export_params": {},
"store_params": {},
"status_params": {
"exit_code": $EXIT_CODE
}
}
EOF

# Upload results
aws s3 cp ./out.json s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/
aws s3 cp ./stdout.log s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/
aws s3 cp ./stderr.log s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/

# Exit with the embulk exit code
exit $EXIT_CODE

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.digdag.client.config.Config
import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine}
import javax.inject.Inject
import pro.civitaspo.digdag.plugin.ecs_task.command.EcsTaskCommandResultInternalOperator
import pro.civitaspo.digdag.plugin.ecs_task.embulk.EcsTaskEmbulkOperator
import pro.civitaspo.digdag.plugin.ecs_task.py.EcsTaskPyOperator
import pro.civitaspo.digdag.plugin.ecs_task.register.EcsTaskRegisterOperator
import pro.civitaspo.digdag.plugin.ecs_task.result.EcsTaskResultOperator
Expand All @@ -22,6 +23,7 @@ object EcsTaskPlugin {

override def get(): JList[OperatorFactory] = {
JArrays.asList(
operatorFactory("ecs_task.embulk", classOf[EcsTaskEmbulkOperator]),
operatorFactory("ecs_task.py", classOf[EcsTaskPyOperator]),
operatorFactory("ecs_task.command_result_internal", classOf[EcsTaskCommandResultInternalOperator]),
operatorFactory("ecs_task.register", classOf[EcsTaskRegisterOperator]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import io.digdag.spi.TaskResult

trait EcsTaskCommandOperator {

val runner: EcsTaskCommandRunner
def createRunner(): EcsTaskCommandRunner

def additionalEnvironments(): Map[String, String]

def uploadScript(): AmazonS3URI
def prepare(): Unit

def runTask(): TaskResult = {
runner.run(scriptsLocationPrefix = uploadScript())
prepare()
createRunner().run()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ class EcsTaskCommandResultInternalOperator(operatorName: String, context: Operat
val exitCode: Int = statusParams.get("exit_code", classOf[Int])

if (exitCode != 0) {
val errorMessage: String = statusParams.get("error_message", classOf[String])
val errorStackTrace: String = statusParams.get("error_stacktrace", classOf[String])
throw new RuntimeException(s"message: $errorMessage, stacktrace: $errorStackTrace")
val errorMessage: String = statusParams.get("error_message", classOf[String], "")
val errorStackTrace: String = statusParams.get("error_stacktrace", classOf[String], "")
val stdout: String = Try(loadStdoutLogContent()).getOrElse("")
val stderr: String = Try(loadStderrLogContent()).getOrElse("")
throw new RuntimeException(s"message: '$errorMessage',\nstacktrace: '$errorStackTrace',\nstdout: '$stdout'\nstderr: '$stderr'")
}

TaskResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ import pro.civitaspo.digdag.plugin.ecs_task.aws.AwsConf

import scala.collection.JavaConverters._

case class EcsTaskCommandRunner(params: Config, environments: Map[String, String], awsConf: AwsConf, logger: Logger) {
case class EcsTaskCommandRunner(
scriptsLocationPrefix: AmazonS3URI,
script: String,
params: Config,
environments: Map[String, String],
awsConf: AwsConf,
logger: Logger
) {

val cf: ConfigFactory = params.getFactory

Expand All @@ -20,15 +27,11 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String
val cpu: Optional[String] = params.getOptional("cpu", classOf[String])
val executionRoleArn: Optional[String] = params.getOptional("execution_role_arn", classOf[String])

val family: String = params.get(
"family",
classOf[String],
params
.get("task_name", classOf[String])
.replaceAll("\\+", "_")
.replaceAll("\\^", "_")
.replaceAll("\\=", "_")
)
val taskName: String = params.get("task_name", classOf[String])
val familyPrefix: String = params.get("family_prefix", classOf[String], "")
val familySuffix: String = params.get("family_suffix", classOf[String], "")
val familyInfix: String = params.get("family_infix", classOf[String], taskName.replaceAll("\\+", "_").replaceAll("\\^", "_").replaceAll("\\=", "_"))
val family: String = params.get("family", classOf[String], s"$familyPrefix$familyInfix$familySuffix")
val memory: Optional[String] = params.getOptional("memory", classOf[String])
val networkMode: Optional[String] = params.getOptional("network_mode", classOf[String])
// NOTE: Use `ecs_task.run>`'s one.
Expand Down Expand Up @@ -95,22 +98,22 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String
// For ecs_task.wait operator
val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("15m"))

def run(scriptsLocationPrefix: AmazonS3URI): TaskResult = {
def run(): TaskResult = {
val subTasks: Config = cf.create()
subTasks.setNested("+register", ecsTaskRegisterSubTask(scriptsLocationPrefix))
subTasks.setNested("+register", ecsTaskRegisterSubTask())
subTasks.setNested("+run", ecsTaskRunInternalSubTask())
subTasks.setNested("+wait", ecsTaskWaitSubTask())
subTasks.setNested("+result", ecsTaskResultSubTask(scriptsLocationPrefix))
subTasks.setNested("+result", ecsTaskResultSubTask())

val builder = TaskResult.defaultBuilder(cf)
builder.subtaskConfig(subTasks)
builder.build()
}

protected def ecsTaskRegisterSubTask(scriptsLocationPrefix: AmazonS3URI): Config = {
protected def ecsTaskRegisterSubTask(): Config = {
withDefaultSubTask { subTask =>
subTask.set("_type", "ecs_task.register")
subTask.set("_command", taskDefinitionConfig(scriptsLocationPrefix))
subTask.set("_command", taskDefinitionConfig())
}
}

Expand Down Expand Up @@ -141,10 +144,10 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String
}
}

protected def ecsTaskResultSubTask(resultLocationPrefix: AmazonS3URI): Config = {
protected def ecsTaskResultSubTask(): Config = {
withDefaultSubTask { subTask =>
subTask.set("_type", "ecs_task.command_result_internal")
subTask.set("_command", resultLocationPrefix.toString)
subTask.set("_command", scriptsLocationPrefix.toString)
}
}

Expand All @@ -162,10 +165,10 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String
subTask
}

protected def taskDefinitionConfig(scriptsLocationPrefix: AmazonS3URI): Config = {
protected def taskDefinitionConfig(): Config = {
val c: Config = cf.create()

c.set("container_definitions", (Seq(containerDefinitionConfig(scriptsLocationPrefix)) ++ sidecars).asJava)
c.set("container_definitions", (Seq(containerDefinitionConfig()) ++ sidecars).asJava)
c.setOptional("cpu", cpu)
c.setOptional("execution_role_arn", executionRoleArn)
c.set("family", family)
Expand All @@ -178,16 +181,16 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String
c
}

protected def containerDefinitionConfig(scriptsLocationPrefix: AmazonS3URI): Config = {
protected def containerDefinitionConfig(): Config = {
val c: Config = cf.create()

val command: Seq[String] = Seq("sh", "-c", s"aws s3 cp ${scriptsLocationPrefix.toString}/run.sh ./ && sh run.sh")
val command: Seq[String] = Seq("sh", "-c", s"aws s3 cp ${scriptsLocationPrefix.toString}/$script ./ && sh $script")
logger.info(s"Run in the container: ${command.mkString(" ")}")
c.set("command", command.asJava)
c.setOptional("disable_networking", disableNetworking)
c.set("dns_search_domains", dnsSearchDomains.asJava)
c.set("dns_servers", dnsServers.asJava)
val additionalLabels: Map[String, String] = Map("pro.civitaspo.digdag.plugin.ecs_task.version" -> "0.0.3")
val additionalLabels: Map[String, String] = Map("pro.civitaspo.digdag.plugin.ecs_task.version" -> "0.0.4")
c.set("docker_labels", (dockerLabels ++ additionalLabels).asJava)
c.set("entry_point", entryPoint.asJava)
c.set("environment", (configEnvironment ++ environments).asJava)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package pro.civitaspo.digdag.plugin.ecs_task.embulk
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.{Files, Path}

import com.amazonaws.services.s3.AmazonS3URI
import io.digdag.client.config.Config
import io.digdag.spi.{OperatorContext, TemplateEngine}
import org.apache.commons.io.FileUtils
import pro.civitaspo.digdag.plugin.ecs_task.AbstractEcsTaskOperator
import pro.civitaspo.digdag.plugin.ecs_task.aws.AmazonS3UriWrapper
import pro.civitaspo.digdag.plugin.ecs_task.command.{EcsTaskCommandOperator, EcsTaskCommandRunner}
import pro.civitaspo.digdag.plugin.ecs_task.util.{TryWithResource, WorkspaceWithTempDir}

import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.{Random, Try}

class EcsTaskEmbulkOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine)
extends AbstractEcsTaskOperator(operatorName, context, systemConfig, templateEngine)
with EcsTaskCommandOperator {

private val runShResourcePath: String = "/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh"

protected val embulkConfig: String = {
val t: Try[String] = Try {
val embulkConfigPath: String = params.get("_command", classOf[String])
val f: File = workspace.getFile(embulkConfigPath)
workspace.templateFile(templateEngine, f.getPath, UTF_8, params)
}
t.getOrElse {
val embulkConfig: Config = params.getNested("_command")
templateEngine.template(embulkConfig.toString, params)
}
}
protected val workspaceS3UriPrefix: AmazonS3URI = {
val parent: String = params.get("workspace_s3_uri_prefix", classOf[String])
val random: String = Random.alphanumeric.take(10).mkString
if (parent.endsWith("/")) AmazonS3UriWrapper(s"$parent$operatorName.$sessionUuid.$random")
else AmazonS3UriWrapper(s"$parent/$operatorName.$sessionUuid.$random")
}
protected val embulkPlugins: Seq[String] = params.getListOrEmpty("embulk_plugins", classOf[String]).asScala

override def createRunner(): EcsTaskCommandRunner = {
EcsTaskCommandRunner(
scriptsLocationPrefix = workspaceS3UriPrefix,
script = "run.sh",
params = params,
environments = additionalEnvironments(),
awsConf = aws.conf,
logger = logger
)
}

override def additionalEnvironments(): Map[String, String] = {
val vars = context.getPrivilegedVariables
val builder = Map.newBuilder[String, String]
vars.getKeys.asScala.foreach { k =>
builder += (k -> vars.get(k))
}
builder.result()
}
override def prepare(): Unit = {
WorkspaceWithTempDir(workspace) { tempDir: Path =>
createConfigFile(tempDir)
createRunShFile(tempDir)
createWorkspaceDir(tempDir)
uploadOnS3(tempDir)
}
}

protected def createConfigFile(parent: Path): Unit = {
val configFile: Path = Files.createFile(parent.resolve("config.yml"))
writeFile(file = configFile, content = embulkConfig)
}

protected def createRunShFile(parent: Path): Unit = {
val dup: Config = params.deepCopy()
dup.set("ECS_TASK_EMBULK_BUCKET", workspaceS3UriPrefix.getBucket)
dup.set("ECS_TASK_EMBULK_PREFIX", workspaceS3UriPrefix.getKey)

dup.set("ECS_TASK_EMBULK_SETUP_COMMAND", "echo 'no setup command'") // set a default value
if (embulkPlugins.nonEmpty) {
logger.warn("`embulk_plugins` option is experimental, so please be careful in the plugin update.")
val cmd: String = (Seq("embulk", "gem", "install") ++ embulkPlugins).mkString(" ")
dup.set("ECS_TASK_EMBULK_SETUP_COMMAND", cmd)
}

TryWithResource(classOf[EcsTaskEmbulkOperator].getResourceAsStream(runShResourcePath)) { is =>
val runShContentTemplate: String = Source.fromInputStream(is).mkString
val runShContent: String = templateEngine.template(runShContentTemplate, dup)
val runShFile: Path = Files.createFile(parent.resolve("run.sh"))
writeFile(file = runShFile, content = runShContent)
}
}

protected def createWorkspaceDir(parent: Path): Unit = {
val targets: Iterator[Path] = Files.list(workspace.getPath).iterator().asScala.filterNot(_.endsWith(".digdag"))
val workspacePath: Path = Files.createDirectory(parent.resolve("workspace"))
targets.foreach { path =>
logger.info(s"Copy: $path -> $workspacePath")
if (Files.isDirectory(path)) FileUtils.copyDirectoryToDirectory(path.toFile, workspacePath.toFile)
else FileUtils.copyFileToDirectory(path.toFile, workspacePath.toFile)
}
}

protected def uploadOnS3(path: Path): Unit = {
logger.info(s"Recursive Upload: $path -> ${workspaceS3UriPrefix.getURI}")
aws.withTransferManager { xfer =>
val upload = xfer.uploadDirectory(
workspaceS3UriPrefix.getBucket,
workspaceS3UriPrefix.getKey,
path.toFile,
true // includeSubdirectories
)
upload.waitForCompletion()
}
}

protected def writeFile(file: Path, content: String): Unit = {
logger.info(s"Write into ${file.toString}")
TryWithResource(workspace.newBufferedWriter(file.toString, UTF_8)) { writer =>
writer.write(content)
}
}
}
Loading

0 comments on commit 9746db2

Please sign in to comment.