From 0e766bfb2684b2415549cc7a5938b3e82afcba7c Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Mon, 5 Nov 2018 10:05:55 +0900 Subject: [PATCH 1/9] Make family name more configuable --- .../ecs_task/command/EcsTaskCommandRunner.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala index 4f46817..5c6e168 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala @@ -20,15 +20,11 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String val cpu: Optional[String] = params.getOptional("cpu", classOf[String]) val executionRoleArn: Optional[String] = params.getOptional("execution_role_arn", classOf[String]) - val family: String = params.get( - "family", - classOf[String], - params - .get("task_name", classOf[String]) - .replaceAll("\\+", "_") - .replaceAll("\\^", "_") - .replaceAll("\\=", "_") - ) + val taskName: String = params.get("task_name", classOf[String]) + val familyPrefix: String = params.get("family_prefix", classOf[String], "") + val familySuffix: String = params.get("family_suffix", classOf[String], "") + val familyInfix: String = params.get("family_infix", classOf[String], taskName.replaceAll("\\+", "_").replaceAll("\\^", "_").replaceAll("\\=", "_")) + val family: String = params.get("family", classOf[String], s"$familyPrefix$familyInfix$familySuffix") val memory: Optional[String] = params.getOptional("memory", classOf[String]) val networkMode: Optional[String] = params.getOptional("network_mode", classOf[String]) // NOTE: Use `ecs_task.run>`'s one. From fd8f1aaf2d90455ced718b56ee73a1947de095bd Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 00:35:28 +0900 Subject: [PATCH 2/9] Write README for scripting operators --- README.md | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/README.md b/README.md index 40e02d9..4f24d7a 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,83 @@ In addition, the below configurations exist. - the values are string to object map. - the usage follows [Digdag Python API](https://docs.digdag.io/python_api.html), [Digdag Ruby API](https://docs.digdag.io/ruby_api.html). +# (Experimental) Scripting Operators + +[digdag-operator-ecs_task](https://github.com/civitaspo/digdag-operator-ecs_task) supports some [scripting operators](https://docs.digdag.io/operators/scripting.html) such as `ecs_task.py`, `ecs_task.rb`. Originally I wanted to provide `ecs_task` as one of the implementations of `CommandExecutor` provided by digdag, but users cannot extend the current CommandExecutor as written in this issue: [\[feature-request\] Use Custom CommandExecutors](https://github.com/treasure-data/digdag/issues/901). Therefore, this plugin implements Scripting Operator on its own. Of course, the usage is the same as the Scripting Operator provided by digdag. When the issue is resolved, I will reimplement it using the `CommandExecutor` of digdag. + +## Scripting Operators Common Configurations + +- **sidecars**: A list of container definitions except the container for scripting operator. (array of map, optional) + - The configuration map is the same as the snake-cased [API_ContainerDefinition](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html) +- **cpu**: The number of CPU units used by the task. It can be expressed as an integer using CPU units, for example `1024`, or as a string using vCPUs, for example `1 vCPU` or `1 vcpu`, in a task definition. String values are converted to an integer indicating the CPU units when the task definition is registered. (string, optional) + - See the docs for more info: [ECS-RegisterTaskDefinition-request-cpu](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html#ECS-RegisterTaskDefinition-request-cpu) +- **execution_role_arn**: The Amazon Resource Name (ARN) of the task execution role that the Amazon ECS container agent and the Docker daemon can assume. (string, optional) +- **family_prefix**: The family name prefix for a task definition. This is used if **family** is not defined. (string, default: `""`) +- **family_infix**: The family name infix for a task definition. This is used if **family** is not defined. (string, default: `"${task_name}"`) + - The default value is replaced as below: + - `+` -> `_` + - `^` -> `_` + - `=` -> `_` +- **family_suffix**: The family name sufix for a task definition. This is used if **family** is not defined. (string, default: `""`) +- **family**: You must specify a `family` for a task definition, which allows you to track multiple versions of the same task definition. The `family` is used as a name for your task definition. Up to 255 letters (uppercase and lowercase), numbers, hyphens, and underscores are allowed. (string, default: `"${family_prefix}${family_infix}${family_suffix}"`) +- **memory**: The amount of memory (in MiB) used by the task. It can be expressed as an integer using MiB, for example `1024`, or as a string using GB, for example `1GB` or `1 GB`, in a task definition. String values are converted to an integer indicating the MiB when the task definition is registered. (string, optional) + - See the docs for more info: [ECS-RegisterTaskDefinition-request-memory](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RegisterTaskDefinition.html#ECS-RegisterTaskDefinition-request-memory) +- **network_mode**: The Docker networking mode to use for the containers in the task. The valid values are `none`, `bridge`, `awsvpc`, and `host`. The default Docker network mode is `bridge`. If using the Fargate launch type, the `awsvpc` network mode is required. If using the EC2 launch type, any network mode can be used. If the network mode is set to `none`, you can't specify port mappings in your container definitions, and the task's containers do not have external connectivity. The `host` and `awsvpc` network modes offer the highest networking performance for containers because they use the EC2 network stack instead of the virtualized network stack provided by the `bridge` mode. With the `host` and `awsvpc` network modes, exposed container ports are mapped directly to the corresponding host port (for the `host` network mode) or the attached elastic network interface port (for the `awsvpc` network mode), so you cannot take advantage of dynamic host port mappings. If the network mode is `awsvpc`, the task is allocated an Elastic Network Interface, and you must specify the **network_configuration** option when you create a service or run a task with the task definition. For more information, see [Task Networking](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-networking.html) in the Amazon Elastic Container Service Developer Guide. If the network mode is `host`, you can't run multiple instantiations of the same task on a single container instance when port mappings are used. Docker for Windows uses different network modes than Docker for Linux. When you register a task definition with Windows containers, you must not specify a network mode. (string, optional) +- **requires_compatibilities**: The launch type required by the task. If no value is specified, it defaults to `EC2`. (string, optional) +- **task_role_arn**: The short name or full Amazon Resource Name (ARN) of the IAM role that containers in this task can assume. All containers in this task are granted the permissions that are specified in this role. (string, optional) +- **volumes**: A list of volume definitions. (array of map, optional) + - The configuration map is the same as the snake-cased [API_Volume](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_Volume.html). +- **disable_networking**: When this parameter is `true`, networking is disabled within the container. (boolean, optional) +- **dns_search_domains**: A list of DNS search domains that are presented to the container. (array of string, optional) +- **dns_servers**: A list of DNS servers that are presented to the container. (array of string, optional) +- **docker_labels**: A key/value map of labels to add to the container. (string to string map, optional) +- **docker_security_options**: A list of strings to provide custom labels for SELinux and AppArmor multi-level security systems. This field is not valid for containers in tasks using the `Fargate` launch type. For more information, see [ECS-Type-ContainerDefinition-dockerSecurityOptions](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-dockerSecurityOptions). (array of string, optional) +- **entry_point**: The entry point that is passed to the container. (array of string, optional) +- **environments**: The environment variables to pass to a container. (string to string map, optional) +- **extra_hosts**: A list of hostnames and IP address mappings to append to the `/etc/hosts` file on the container. This parameter is not supported for Windows containers or tasks that use the `awsvpc` network mode. (string to string map, optional) +- **health_check**: The health check command and associated configuration parameters for the container. The configuration map is the same as the snake-cased [APIReference/API_HealthCheck](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_HealthCheck.html). (map, optional) +- **hostname**: The hostname to use for your container. (string, optional) +- **image**: The image used to start a container. This string is passed directly to the Docker daemon. Images in the Docker Hub registry are available by default. Other repositories are specified with either `repository-url/image:tag` or `repository-url/image@digest`. Up to 255 letters (uppercase and lowercase), numbers, hyphens, underscores, colons, periods, forward slashes, and number signs are allowed. For more information, see [ECS-Type-ContainerDefinition-image](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-image). (string, optional) +- **interactive**: When this parameter is true, this allows you to deploy containerized applications that require `stdin` or a `tty` to be allocated. (boolean, optional) +- **links**: The `link` parameter allows containers to communicate with each other without the need for port mappings. Only supported if the network mode of a task definition is set to `bridge`. The `name:internalName` construct is analogous to `name:alias` in Docker links. Up to 255 letters (uppercase and lowercase), numbers, hyphens, and underscores are allowed. (array of string, optional) +- **linux_parameters**: Linux-specific modifications that are applied to the container, such as Linux [KernelCapabilities](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_KernelCapabilities.html). The configuration map is the same as the snake-cased [API_LinuxParameters](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_LinuxParameters.html). (map, optional) +- **log_configuration**: The log configuration specification for the container. For more information, see [ECS-Type-ContainerDefinition-logConfiguration](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-logConfiguration). The configuration map is the same as the snake-cased [API_LogConfiguration](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_LogConfiguration.html). (map, optional) +- **mount_points**: The mount points for data volumes in your container. (array of map, optional) + - The configuration map is the same as the snake-cased [API_MountPoint](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_MountPoint.html). +- **container_name**: The name of a container. (string, default: the same as **family**) +- **port_mappings**: The list of port mappings for the container. Port mappings allow containers to access ports on the host container instance to send or receive traffic. For more informaiton, see [ECS-Type-ContainerDefinition-portMappings](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-portMappings). (array of map, optional) + - The configuration map is the same as the snake-cased [API_PortMapping](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_PortMapping.html). +- **privileged**: When this parameter is `true`, the container is given elevated privileges on the host container instance (similar to the `root` user). (boolean, optional) +- **pseudo_terminal**: When this parameter is `true`, a TTY is allocated. (boolean, optional) +- **readonly_root_filesystem**: When this parameter is `true`, the container is given read-only access to its root file system. (boolean, optional) +- **repository_credentials**: The private repository authentication credentials to use. The configuration map is the same as the snake-cased [API_RepositoryCredentials](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RepositoryCredentials.html). (map, optional) +- **system_controls**: A list of namespaced kernel parameters to set in the container. For more information, see [ECS-Type-ContainerDefinition-systemControls](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerDefinition.html#ECS-Type-ContainerDefinition-systemControls). (array of map, optional) + - The configuration map is the same as the snake-cased [API_SystemControl](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_SystemControl.html). +- **ulimits**: A list of ulimits to set in the container. (array of map, optional) + - The configuration map is the same as the snake-cased [API_Ulimit](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_Ulimit.html). +- **user**: The user name to use inside the container. (string, optional) +- **volumes_from**: Data volumes to mount from another container. (array of map, optional) + - The configuration map is the same as the snake-cased [API_VolumeFrom](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_VolumeFrom.html) +- **working_directory**: The working directory in which to run commands inside the container. (string, optional) +- **cluster**: The short name or full Amazon Resource Name (ARN) of the cluster on which to run your task. (string, required) +- **count**: The number of instantiations of the specified task to place on your cluster. You can specify up to 10 tasks per call. (integer, optional) +- **group**: The name of the task group to associate with the task. The default value is the family name of the task definition (for example, family:my-family-name). (string, optional) +- **launch_type**: The launch type on which to run your task. Valid values are `EC2`, `FARGATE`. (string, optional) +- **network_configuration**: The network configuration for the task. This parameter is required for task definitions that use the `awsvpc` network mode to receive their own Elastic Network Interface, and it is not supported for other network modes. For more information, see [Task Networking](http://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-networking.html) in the Amazon Elastic Container Service Developer Guide. The configuration map is the same as the snake-cased [API_NetworkConfiguration](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_NetworkConfiguration.html). (map, optional) +- **overrides**: A list of container overrides that specify the name of a container in the specified task definition and the overrides it should receive. The configuration map is the same as the snake-cased [API_TaskOverride](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html). (map, optional) +- **placement_constraints**: An array of placement constraint objects to use for the task. You can specify up to 10 constraints per task. (array of map, optional) + - The configuration map is the same as the snake-cased [API_PlacementConstraint](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_PlacementConstraint.html). +- **placement_strategy**: The placement strategy objects to use for the task. You can specify a maximum of five strategy rules per task. (array of map, optional) + - The configuration map is the same as the snake-cased [API_PlacementStrategy](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_PlacementStrategy.html). +- **platform_version**: The platform version on which to run your task. If one is not specified, the latest version is used by default. (string, optional) +- **started_by**: An optional tag specified when a task is started. (string, optional) + +## Configuration for `ecs_task.py>` operator + +- **ecs_task.py>**: Name of a method to run. The format is `[PACKAGE.CLASS.]METHOD`. (string, required) +- **workspace_s3_uri_prefix**: S3 uri prefix for using as workspace. (string, required) +- **pip_install**: packages to install before task running. (array of string, optional) + # Development ## Run an Example From 12062e12850b43875ef70a34958e8855505f4ec0 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 01:40:49 +0900 Subject: [PATCH 3/9] Write about the risk of the scripting operators. --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4f24d7a..03438f6 100644 --- a/README.md +++ b/README.md @@ -195,11 +195,12 @@ In addition, the below configurations exist. - The configuration map is the same as the snake-cased [API_PlacementStrategy](https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_PlacementStrategy.html). - **platform_version**: The platform version on which to run your task. If one is not specified, the latest version is used by default. (string, optional) - **started_by**: An optional tag specified when a task is started. (string, optional) +- **workspace_s3_uri_prefix**: S3 uri prefix for using as workspace. (string, required) + - Currently, input params, output params, stdout, stderr, and internal scripts are put on S3, and then they are not removed. So it's insecure unless strict access control to S3. ## Configuration for `ecs_task.py>` operator - **ecs_task.py>**: Name of a method to run. The format is `[PACKAGE.CLASS.]METHOD`. (string, required) -- **workspace_s3_uri_prefix**: S3 uri prefix for using as workspace. (string, required) - **pip_install**: packages to install before task running. (array of string, optional) # Development From 5ded0abfb97ec7dc4e4d1850b86659e2cd22fe48 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 02:20:43 +0900 Subject: [PATCH 4/9] Write `ecs_task.embulk>` operator specification. --- README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/README.md b/README.md index 03438f6..23c5a45 100644 --- a/README.md +++ b/README.md @@ -203,6 +203,13 @@ In addition, the below configurations exist. - **ecs_task.py>**: Name of a method to run. The format is `[PACKAGE.CLASS.]METHOD`. (string, required) - **pip_install**: packages to install before task running. (array of string, optional) +## Configuration for `ecs_task.embulk>` operator + +- **ecs_task.embulk>**: Embulk config yaml or file. You can use digdag's template engine like `${...}` in the config yaml or file. (string or map, required) + - For more information, see [Embulk Docs](http://www.embulk.org/docs/index.html). +- **embulk_plugins**: packages to install before task running. (array of string, optional) + - You can see the plugins in [Embulk Plugins](http://www.embulk.org/plugins/). + # Development ## Run an Example From 0924afbbcb40789246f959e335d7d9a24db62825 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 02:50:21 +0900 Subject: [PATCH 5/9] Refactor scripting operator core logic --- .../command/EcsTaskCommandOperator.scala | 7 +++--- .../command/EcsTaskCommandRunner.scala | 24 +++++++++---------- .../ecs_task/py/EcsTaskPyOperator.scala | 20 ++++++++++------ 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandOperator.scala index 1c4ef4d..fa05d78 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandOperator.scala @@ -4,14 +4,15 @@ import io.digdag.spi.TaskResult trait EcsTaskCommandOperator { - val runner: EcsTaskCommandRunner + def createRunner(): EcsTaskCommandRunner def additionalEnvironments(): Map[String, String] - def uploadScript(): AmazonS3URI + def prepare(): Unit def runTask(): TaskResult = { - runner.run(scriptsLocationPrefix = uploadScript()) + prepare() + createRunner().run() } } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala index 5c6e168..8fe721b 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala @@ -9,7 +9,7 @@ import pro.civitaspo.digdag.plugin.ecs_task.aws.AwsConf import scala.collection.JavaConverters._ -case class EcsTaskCommandRunner(params: Config, environments: Map[String, String], awsConf: AwsConf, logger: Logger) { +case class EcsTaskCommandRunner(scriptsLocationPrefix: AmazonS3URI, script: String, params: Config, environments: Map[String, String], awsConf: AwsConf, logger: Logger) { val cf: ConfigFactory = params.getFactory @@ -91,22 +91,22 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String // For ecs_task.wait operator val timeout: DurationParam = params.get("timeout", classOf[DurationParam], DurationParam.parse("15m")) - def run(scriptsLocationPrefix: AmazonS3URI): TaskResult = { + def run(): TaskResult = { val subTasks: Config = cf.create() - subTasks.setNested("+register", ecsTaskRegisterSubTask(scriptsLocationPrefix)) + subTasks.setNested("+register", ecsTaskRegisterSubTask()) subTasks.setNested("+run", ecsTaskRunInternalSubTask()) subTasks.setNested("+wait", ecsTaskWaitSubTask()) - subTasks.setNested("+result", ecsTaskResultSubTask(scriptsLocationPrefix)) + subTasks.setNested("+result", ecsTaskResultSubTask()) val builder = TaskResult.defaultBuilder(cf) builder.subtaskConfig(subTasks) builder.build() } - protected def ecsTaskRegisterSubTask(scriptsLocationPrefix: AmazonS3URI): Config = { + protected def ecsTaskRegisterSubTask(): Config = { withDefaultSubTask { subTask => subTask.set("_type", "ecs_task.register") - subTask.set("_command", taskDefinitionConfig(scriptsLocationPrefix)) + subTask.set("_command", taskDefinitionConfig()) } } @@ -137,10 +137,10 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String } } - protected def ecsTaskResultSubTask(resultLocationPrefix: AmazonS3URI): Config = { + protected def ecsTaskResultSubTask(): Config = { withDefaultSubTask { subTask => subTask.set("_type", "ecs_task.command_result_internal") - subTask.set("_command", resultLocationPrefix.toString) + subTask.set("_command", scriptsLocationPrefix.toString) } } @@ -158,10 +158,10 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String subTask } - protected def taskDefinitionConfig(scriptsLocationPrefix: AmazonS3URI): Config = { + protected def taskDefinitionConfig(): Config = { val c: Config = cf.create() - c.set("container_definitions", (Seq(containerDefinitionConfig(scriptsLocationPrefix)) ++ sidecars).asJava) + c.set("container_definitions", (Seq(containerDefinitionConfig()) ++ sidecars).asJava) c.setOptional("cpu", cpu) c.setOptional("execution_role_arn", executionRoleArn) c.set("family", family) @@ -174,10 +174,10 @@ case class EcsTaskCommandRunner(params: Config, environments: Map[String, String c } - protected def containerDefinitionConfig(scriptsLocationPrefix: AmazonS3URI): Config = { + protected def containerDefinitionConfig(): Config = { val c: Config = cf.create() - val command: Seq[String] = Seq("sh", "-c", s"aws s3 cp ${scriptsLocationPrefix.toString}/run.sh ./ && sh run.sh") + val command: Seq[String] = Seq("sh", "-c", s"aws s3 cp ${scriptsLocationPrefix.toString}/$script ./ && sh $script") logger.info(s"Run in the container: ${command.mkString(" ")}") c.set("command", command.asJava) c.setOptional("disable_networking", disableNetworking) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala index ba91422..414d49e 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala @@ -14,7 +14,6 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.language.reflectiveCalls import scala.util.Random -import scala.util.hashing.MurmurHash3 class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) extends AbstractEcsTaskOperator(operatorName, context, systemConfig, templateEngine) @@ -27,13 +26,21 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo protected val workspaceS3UriPrefix: AmazonS3URI = { val parent: String = params.get("workspace_s3_uri_prefix", classOf[String]) val random: String = Random.alphanumeric.take(10).mkString - if (parent.endsWith("/")) AmazonS3UriWrapper(s"${parent}ecs_task.py.$sessionUuid.$random") - else AmazonS3UriWrapper(s"$parent/ecs_task.py.$sessionUuid.$random") + if (parent.endsWith("/")) AmazonS3UriWrapper(s"$parent$operatorName.$sessionUuid.$random") + else AmazonS3UriWrapper(s"$parent/$operatorName.$sessionUuid.$random") } protected val pipInstall: Seq[String] = params.getListOrEmpty("pip_install", classOf[String]).asScala - override val runner: EcsTaskCommandRunner = - EcsTaskCommandRunner(params = params, environments = additionalEnvironments(), awsConf = aws.conf, logger = logger) + override def createRunner(): EcsTaskCommandRunner = { + EcsTaskCommandRunner( + scriptsLocationPrefix = workspaceS3UriPrefix, + script = "run.sh", + params = params, + environments = additionalEnvironments(), + awsConf = aws.conf, + logger = logger + ) + } override def additionalEnvironments(): Map[String, String] = { val vars = context.getPrivilegedVariables @@ -44,7 +51,7 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo builder.result() } - override def uploadScript(): AmazonS3URI = { + override def prepare(): Unit = { withTempDir(operatorName) { tempDir: Path => createInFile(tempDir) createRunnerPyFile(tempDir) @@ -52,7 +59,6 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo createWorkspaceDir(tempDir) uploadOnS3(tempDir) } - workspaceS3UriPrefix } protected def createInFile(parent: Path): Unit = { From 5abbd32faa4a252abd01999f1ec5db0140026711 Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 03:22:15 +0900 Subject: [PATCH 6/9] Move `using` method to util package as `TryWithResource` --- .../plugin/ecs_task/py/EcsTaskPyOperator.scala | 12 ++++-------- .../plugin/ecs_task/util/TryWithResource.scala | 10 ++++++++++ 2 files changed, 14 insertions(+), 8 deletions(-) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/TryWithResource.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala index 414d49e..1ca4834 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala @@ -9,6 +9,7 @@ import org.apache.commons.io.FileUtils import pro.civitaspo.digdag.plugin.ecs_task.AbstractEcsTaskOperator import pro.civitaspo.digdag.plugin.ecs_task.aws.AmazonS3UriWrapper import pro.civitaspo.digdag.plugin.ecs_task.command.{EcsTaskCommandOperator, EcsTaskCommandRunner} +import pro.civitaspo.digdag.plugin.ecs_task.util.TryWithResource import scala.collection.JavaConverters._ import scala.io.Source @@ -68,7 +69,7 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo } protected def createRunnerPyFile(parent: Path): Unit = { - using(classOf[EcsTaskPyOperator].getResourceAsStream(runnerPyResourcePath)) { is => + TryWithResource(classOf[EcsTaskPyOperator].getResourceAsStream(runnerPyResourcePath)) { is => val runnerPyContent: String = Source.fromInputStream(is).mkString val runnerPyFile: Path = Files.createFile(parent.resolve("runner.py")) writeFile(file = runnerPyFile, content = runnerPyContent) @@ -88,7 +89,7 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo dup.set("ECS_TASK_PY_SETUP_COMMAND", cmd) } - using(classOf[EcsTaskPyOperator].getResourceAsStream(runShResourcePath)) { is => + TryWithResource(classOf[EcsTaskPyOperator].getResourceAsStream(runShResourcePath)) { is => val runShContentTemplate: String = Source.fromInputStream(is).mkString val runShContent: String = templateEngine.template(runShContentTemplate, dup) val runShFile: Path = Files.createFile(parent.resolve("run.sh")) @@ -121,16 +122,11 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo protected def writeFile(file: Path, content: String): Unit = { logger.info(s"Write into ${file.toString}") - using(workspace.newBufferedWriter(file.toString, UTF_8)) { writer => + TryWithResource(workspace.newBufferedWriter(file.toString, UTF_8)) { writer => writer.write(content) } } - protected def using[A <: { def close() }, B](resource: A)(f: A => B): B = { - try f(resource) - finally resource.close() - } - // ref. https://github.com/muga/digdag/blob/aff3dfab0b91aa6787d7921ce34d5b3b21947c20/digdag-plugin-utils/src/main/java/io/digdag/util/Workspace.java#L84-L95 protected def withTempDir[T](prefix: String)(f: Path => T): T = { val dir = workspace.getProjectPath.resolve(".digdag/tmp") diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/TryWithResource.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/TryWithResource.scala new file mode 100644 index 0000000..33f29d6 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/TryWithResource.scala @@ -0,0 +1,10 @@ +package pro.civitaspo.digdag.plugin.ecs_task.util + +object TryWithResource { + + def apply[A <: { def close() }, B](resource: A)(f: A => B): B = { + try f(resource) + finally resource.close() + } + +} From 4aed34c7106096c58b633b96eb8ecd6c47a93e6f Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 03:29:05 +0900 Subject: [PATCH 7/9] Move `withTempDir` method to util package as `WorkspaceTempDir` --- .../ecs_task/py/EcsTaskPyOperator.scala | 13 ++---------- .../ecs_task/util/WorkspaceWithTempDir.scala | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+), 11 deletions(-) create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/WorkspaceWithTempDir.scala diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala index 1ca4834..f57fd31 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/py/EcsTaskPyOperator.scala @@ -9,7 +9,7 @@ import org.apache.commons.io.FileUtils import pro.civitaspo.digdag.plugin.ecs_task.AbstractEcsTaskOperator import pro.civitaspo.digdag.plugin.ecs_task.aws.AmazonS3UriWrapper import pro.civitaspo.digdag.plugin.ecs_task.command.{EcsTaskCommandOperator, EcsTaskCommandRunner} -import pro.civitaspo.digdag.plugin.ecs_task.util.TryWithResource +import pro.civitaspo.digdag.plugin.ecs_task.util.{TryWithResource, WorkspaceWithTempDir} import scala.collection.JavaConverters._ import scala.io.Source @@ -53,7 +53,7 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo } override def prepare(): Unit = { - withTempDir(operatorName) { tempDir: Path => + WorkspaceWithTempDir(workspace) { tempDir: Path => createInFile(tempDir) createRunnerPyFile(tempDir) createRunShFile(tempDir) @@ -127,13 +127,4 @@ class EcsTaskPyOperator(operatorName: String, context: OperatorContext, systemCo } } - // ref. https://github.com/muga/digdag/blob/aff3dfab0b91aa6787d7921ce34d5b3b21947c20/digdag-plugin-utils/src/main/java/io/digdag/util/Workspace.java#L84-L95 - protected def withTempDir[T](prefix: String)(f: Path => T): T = { - val dir = workspace.getProjectPath.resolve(".digdag/tmp") - Files.createDirectories(dir) - val tempDir: Path = Files.createTempDirectory(dir, prefix) - try f(tempDir) - finally FileUtils.deleteDirectory(tempDir.toFile) - } - } diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/WorkspaceWithTempDir.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/WorkspaceWithTempDir.scala new file mode 100644 index 0000000..f731e38 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/util/WorkspaceWithTempDir.scala @@ -0,0 +1,21 @@ +package pro.civitaspo.digdag.plugin.ecs_task.util +import java.nio.file.{Files, Path} + +import io.digdag.util.Workspace +import org.apache.commons.io.FileUtils + +import scala.util.Random + +// ref. https://github.com/muga/digdag/blob/aff3dfab0b91aa6787d7921ce34d5b3b21947c20/digdag-plugin-utils/src/main/java/io/digdag/util/Workspace.java#L84-L95 +object WorkspaceWithTempDir { + + def apply[T](workspace: Workspace)(f: Path => T): T = { + val dir = workspace.getProjectPath.resolve(".digdag/tmp") + Files.createDirectories(dir) + val random: String = Random.alphanumeric.take(10).mkString + val tempDir: Path = Files.createTempDirectory(dir, random) + try f(tempDir) + finally FileUtils.deleteDirectory(tempDir.toFile) + } + +} From 91ba99cd447984e9470f05ce65faf71342a8632e Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 04:05:29 +0900 Subject: [PATCH 8/9] Implement ecs_task.embulk operator --- .../digdag/plugin/ecs_task/embulk/run.sh | 60 +++++++++ .../plugin/ecs_task/EcsTaskPlugin.scala | 2 + ...EcsTaskCommandResultInternalOperator.scala | 8 +- .../embulk/EcsTaskEmbulkOperator.scala | 124 ++++++++++++++++++ 4 files changed, 191 insertions(+), 3 deletions(-) create mode 100644 src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh create mode 100644 src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala diff --git a/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh b/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh new file mode 100644 index 0000000..9df90e1 --- /dev/null +++ b/src/main/resources/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh @@ -0,0 +1,60 @@ +set -ex +set -o pipefail + +mkdir -p ./digdag-operator-ecs_task +cd digdag-operator-ecs_task + +# Create output files +touch out.json stdout.log stderr.log + +# Download requirements +aws s3 cp s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/ ./ --recursive + +# Move workspace +cd workspace + +# Unset e option for returning embulk results to digdag +set +e + +# Run setup command +${ECS_TASK_EMBULK_SETUP_COMMAND} \ + 2>> ../stderr.log \ + | tee -a ../stdout.log + +# Run +embulk run ../config.yml \ + 2>> ../stderr.log \ + | tee -a ../stdout.log + +# Capture exit code +EXIT_CODE=$? + +# Set e option +set -e + +# Move out workspace +cd .. + +# For logging driver +cat stderr.log 1>&2 + +# Write out.json +cat < out.json +{ + "subtask_config": {}, + "export_params": {}, + "store_params": {}, + "status_params": { + "exit_code": $EXIT_CODE + } +} +EOF + +# Upload results +aws s3 cp ./out.json s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/ +aws s3 cp ./stdout.log s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/ +aws s3 cp ./stderr.log s3://${ECS_TASK_EMBULK_BUCKET}/${ECS_TASK_EMBULK_PREFIX}/ + +# Exit with the embulk exit code +exit $EXIT_CODE + diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/EcsTaskPlugin.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/EcsTaskPlugin.scala index 3b712ce..2d03fc6 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/EcsTaskPlugin.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/EcsTaskPlugin.scala @@ -7,6 +7,7 @@ import io.digdag.client.config.Config import io.digdag.spi.{Operator, OperatorContext, OperatorFactory, OperatorProvider, Plugin, TemplateEngine} import javax.inject.Inject import pro.civitaspo.digdag.plugin.ecs_task.command.EcsTaskCommandResultInternalOperator +import pro.civitaspo.digdag.plugin.ecs_task.embulk.EcsTaskEmbulkOperator import pro.civitaspo.digdag.plugin.ecs_task.py.EcsTaskPyOperator import pro.civitaspo.digdag.plugin.ecs_task.register.EcsTaskRegisterOperator import pro.civitaspo.digdag.plugin.ecs_task.result.EcsTaskResultOperator @@ -22,6 +23,7 @@ object EcsTaskPlugin { override def get(): JList[OperatorFactory] = { JArrays.asList( + operatorFactory("ecs_task.embulk", classOf[EcsTaskEmbulkOperator]), operatorFactory("ecs_task.py", classOf[EcsTaskPyOperator]), operatorFactory("ecs_task.command_result_internal", classOf[EcsTaskCommandResultInternalOperator]), operatorFactory("ecs_task.register", classOf[EcsTaskRegisterOperator]), diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandResultInternalOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandResultInternalOperator.scala index 3810dda..26f45b1 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandResultInternalOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandResultInternalOperator.scala @@ -20,9 +20,11 @@ class EcsTaskCommandResultInternalOperator(operatorName: String, context: Operat val exitCode: Int = statusParams.get("exit_code", classOf[Int]) if (exitCode != 0) { - val errorMessage: String = statusParams.get("error_message", classOf[String]) - val errorStackTrace: String = statusParams.get("error_stacktrace", classOf[String]) - throw new RuntimeException(s"message: $errorMessage, stacktrace: $errorStackTrace") + val errorMessage: String = statusParams.get("error_message", classOf[String], "") + val errorStackTrace: String = statusParams.get("error_stacktrace", classOf[String], "") + val stdout: String = Try(loadStdoutLogContent()).getOrElse("") + val stderr: String = Try(loadStderrLogContent()).getOrElse("") + throw new RuntimeException(s"message: '$errorMessage',\nstacktrace: '$errorStackTrace',\nstdout: '$stdout'\nstderr: '$stderr'") } TaskResult diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala new file mode 100644 index 0000000..cab1f80 --- /dev/null +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala @@ -0,0 +1,124 @@ +package pro.civitaspo.digdag.plugin.ecs_task.embulk +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 +import java.nio.file.{Files, Path} + +import com.amazonaws.services.s3.AmazonS3URI +import io.digdag.client.config.Config +import io.digdag.spi.{OperatorContext, TemplateEngine} +import org.apache.commons.io.FileUtils +import pro.civitaspo.digdag.plugin.ecs_task.AbstractEcsTaskOperator +import pro.civitaspo.digdag.plugin.ecs_task.aws.AmazonS3UriWrapper +import pro.civitaspo.digdag.plugin.ecs_task.command.{EcsTaskCommandOperator, EcsTaskCommandRunner} +import pro.civitaspo.digdag.plugin.ecs_task.util.{TryWithResource, WorkspaceWithTempDir} + +import scala.collection.JavaConverters._ +import scala.io.Source +import scala.util.{Random, Try} + +class EcsTaskEmbulkOperator(operatorName: String, context: OperatorContext, systemConfig: Config, templateEngine: TemplateEngine) + extends AbstractEcsTaskOperator(operatorName, context, systemConfig, templateEngine) + with EcsTaskCommandOperator { + + private val runShResourcePath: String = "/pro/civitaspo/digdag/plugin/ecs_task/embulk/run.sh" + + protected val embulkConfig: String = { + val t: Try[String] = Try { + val embulkConfigPath: String = params.get("_command", classOf[String]) + val f: File = workspace.getFile(embulkConfigPath) + workspace.templateFile(templateEngine, f.getPath, UTF_8, params) + } + t.getOrElse { + val embulkConfig: Config = params.getNested("_command") + templateEngine.template(embulkConfig.toString, params) + } + } + protected val workspaceS3UriPrefix: AmazonS3URI = { + val parent: String = params.get("workspace_s3_uri_prefix", classOf[String]) + val random: String = Random.alphanumeric.take(10).mkString + if (parent.endsWith("/")) AmazonS3UriWrapper(s"$parent$operatorName.$sessionUuid.$random") + else AmazonS3UriWrapper(s"$parent/$operatorName.$sessionUuid.$random") + } + protected val embulkPlugins: Seq[String] = params.getListOrEmpty("embulk_plugins", classOf[String]).asScala + + override def createRunner(): EcsTaskCommandRunner = { + EcsTaskCommandRunner( + scriptsLocationPrefix = workspaceS3UriPrefix, + script = "run.sh", + params = params, + environments = additionalEnvironments(), + awsConf = aws.conf, + logger = logger + ) + } + + override def additionalEnvironments(): Map[String, String] = { + val vars = context.getPrivilegedVariables + val builder = Map.newBuilder[String, String] + vars.getKeys.asScala.foreach { k => builder += (k -> vars.get(k)) + } + builder.result() + } + override def prepare(): Unit = { + WorkspaceWithTempDir(workspace) { tempDir: Path => + createConfigFile(tempDir) + createRunShFile(tempDir) + createWorkspaceDir(tempDir) + uploadOnS3(tempDir) + } + } + + protected def createConfigFile(parent: Path): Unit = { + val configFile: Path = Files.createFile(parent.resolve("config.yml")) + writeFile(file = configFile, content = embulkConfig) + } + + protected def createRunShFile(parent: Path): Unit = { + val dup: Config = params.deepCopy() + dup.set("ECS_TASK_EMBULK_BUCKET", workspaceS3UriPrefix.getBucket) + dup.set("ECS_TASK_EMBULK_PREFIX", workspaceS3UriPrefix.getKey) + + dup.set("ECS_TASK_EMBULK_SETUP_COMMAND", "echo 'no setup command'") // set a default value + if (embulkPlugins.nonEmpty) { + logger.warn("`embulk_plugins` option is experimental, so please be careful in the plugin update.") + val cmd: String = (Seq("embulk", "gem", "install") ++ embulkPlugins).mkString(" ") + dup.set("ECS_TASK_EMBULK_SETUP_COMMAND", cmd) + } + + TryWithResource(classOf[EcsTaskEmbulkOperator].getResourceAsStream(runShResourcePath)) { is => + val runShContentTemplate: String = Source.fromInputStream(is).mkString + val runShContent: String = templateEngine.template(runShContentTemplate, dup) + val runShFile: Path = Files.createFile(parent.resolve("run.sh")) + writeFile(file = runShFile, content = runShContent) + } + } + + protected def createWorkspaceDir(parent: Path): Unit = { + val targets: Iterator[Path] = Files.list(workspace.getPath).iterator().asScala.filterNot(_.endsWith(".digdag")) + val workspacePath: Path = Files.createDirectory(parent.resolve("workspace")) + targets.foreach { path => + logger.info(s"Copy: $path -> $workspacePath") + if (Files.isDirectory(path)) FileUtils.copyDirectoryToDirectory(path.toFile, workspacePath.toFile) + else FileUtils.copyFileToDirectory(path.toFile, workspacePath.toFile) + } + } + + protected def uploadOnS3(path: Path): Unit = { + logger.info(s"Recursive Upload: $path -> ${workspaceS3UriPrefix.getURI}") + aws.withTransferManager { xfer => + val upload = xfer.uploadDirectory( + workspaceS3UriPrefix.getBucket, + workspaceS3UriPrefix.getKey, + path.toFile, + true // includeSubdirectories + ) + upload.waitForCompletion() + } + } + + protected def writeFile(file: Path, content: String): Unit = { + logger.info(s"Write into ${file.toString}") + TryWithResource(workspace.newBufferedWriter(file.toString, UTF_8)) { writer => writer.write(content) + } + } +} From f55b9a8851da5e77cb485dd155042ccbba8b3bba Mon Sep 17 00:00:00 2001 From: Civitaspo Date: Tue, 6 Nov 2018 04:45:10 +0900 Subject: [PATCH 9/9] Ship v0.0.4 --- CHANGELOG.md | 11 +++++++++-- README.md | 2 +- build.gradle | 2 +- example/example.dig | 2 +- .../ecs_task/command/EcsTaskCommandRunner.scala | 11 +++++++++-- .../ecs_task/embulk/EcsTaskEmbulkOperator.scala | 6 ++++-- 6 files changed, 25 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4513479..d040fac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ +0.0.4 (2018-11-06) +================== + +* [Experimental] Implement `ecs_task.embulk>` operator. +* [Enhancement] Write README for scripting operators. +* [Enhancement] Make family name more configuable for scripting operators. + 0.0.3 (2018-10-30) -====================== +================== * [Breaking Change] Do not use enum parameter directory because the enums require upper camel case ( `ecs_task.{py,register,run}>` operator) * [Enhancement] Rename the configuration key: `additional_containers` to `sidecars` ( `ecs_task.py>` operator) @@ -16,7 +23,7 @@ 0.0.2 (2018-10-29) ================== -* [Experimental] Implement ecs_task.py> operator. (No document yet) +* [Experimental] Implement `ecs_task.py>` operator. (No document yet) * [Fix] Stop correctly after task run to shutdown TransferManager after processing. 0.0.1 (2018-10-23) diff --git a/README.md b/README.md index 23c5a45..1703630 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ _export: repositories: - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.3 + - pro.civitaspo:digdag-operator-ecs_task:0.0.4 ecs_task: auth_method: profile diff --git a/build.gradle b/build.gradle index 17f1010..d2be61f 100644 --- a/build.gradle +++ b/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'pro.civitaspo' -version = '0.0.3' +version = '0.0.4' def digdagVersion = '0.9.31' def scalaSemanticVersion = "2.12.6" diff --git a/example/example.dig b/example/example.dig index 73fb51e..ee50a41 100644 --- a/example/example.dig +++ b/example/example.dig @@ -4,7 +4,7 @@ _export: - file://${repos} # - https://jitpack.io dependencies: - - pro.civitaspo:digdag-operator-ecs_task:0.0.3 + - pro.civitaspo:digdag-operator-ecs_task:0.0.4 ecs_task: auth_method: profile diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala index 8fe721b..a08369b 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/command/EcsTaskCommandRunner.scala @@ -9,7 +9,14 @@ import pro.civitaspo.digdag.plugin.ecs_task.aws.AwsConf import scala.collection.JavaConverters._ -case class EcsTaskCommandRunner(scriptsLocationPrefix: AmazonS3URI, script: String, params: Config, environments: Map[String, String], awsConf: AwsConf, logger: Logger) { +case class EcsTaskCommandRunner( + scriptsLocationPrefix: AmazonS3URI, + script: String, + params: Config, + environments: Map[String, String], + awsConf: AwsConf, + logger: Logger +) { val cf: ConfigFactory = params.getFactory @@ -183,7 +190,7 @@ case class EcsTaskCommandRunner(scriptsLocationPrefix: AmazonS3URI, script: Stri c.setOptional("disable_networking", disableNetworking) c.set("dns_search_domains", dnsSearchDomains.asJava) c.set("dns_servers", dnsServers.asJava) - val additionalLabels: Map[String, String] = Map("pro.civitaspo.digdag.plugin.ecs_task.version" -> "0.0.3") + val additionalLabels: Map[String, String] = Map("pro.civitaspo.digdag.plugin.ecs_task.version" -> "0.0.4") c.set("docker_labels", (dockerLabels ++ additionalLabels).asJava) c.set("entry_point", entryPoint.asJava) c.set("environment", (configEnvironment ++ environments).asJava) diff --git a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala index cab1f80..1910c47 100644 --- a/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala +++ b/src/main/scala/pro/civitaspo/digdag/plugin/ecs_task/embulk/EcsTaskEmbulkOperator.scala @@ -55,7 +55,8 @@ class EcsTaskEmbulkOperator(operatorName: String, context: OperatorContext, syst override def additionalEnvironments(): Map[String, String] = { val vars = context.getPrivilegedVariables val builder = Map.newBuilder[String, String] - vars.getKeys.asScala.foreach { k => builder += (k -> vars.get(k)) + vars.getKeys.asScala.foreach { k => + builder += (k -> vars.get(k)) } builder.result() } @@ -118,7 +119,8 @@ class EcsTaskEmbulkOperator(operatorName: String, context: OperatorContext, syst protected def writeFile(file: Path, content: String): Unit = { logger.info(s"Write into ${file.toString}") - TryWithResource(workspace.newBufferedWriter(file.toString, UTF_8)) { writer => writer.write(content) + TryWithResource(workspace.newBufferedWriter(file.toString, UTF_8)) { writer => + writer.write(content) } } }