diff --git a/docs/configs/docsdev.js b/docs/configs/docsdev.js index 541f314645e0..2bff8b5fef99 100644 --- a/docs/configs/docsdev.js +++ b/docs/configs/docsdev.js @@ -153,6 +153,10 @@ export default { title: 'Amazon EMR', link: '/en-us/docs/dev/user_doc/guide/task/emr.html', }, + { + title: 'Amazon EMR Serverless', + link: '/en-us/docs/dev/user_doc/guide/task/emr-serverless.html', + }, { title: 'Apache Zeppelin', link: '/en-us/docs/dev/user_doc/guide/task/zeppelin.html', @@ -881,6 +885,10 @@ export default { title: 'Amazon EMR', link: '/zh-cn/docs/dev/user_doc/guide/task/emr.html', }, + { + title: 'Amazon EMR Serverless', + link: '/zh-cn/docs/dev/user_doc/guide/task/emr-serverless.html', + }, { title: 'Apache Zeppelin', link: '/zh-cn/docs/dev/user_doc/guide/task/zeppelin.html', diff --git a/docs/docs/en/guide/task/emr-serverless.md b/docs/docs/en/guide/task/emr-serverless.md new file mode 100644 index 000000000000..b661aac07a95 --- /dev/null +++ b/docs/docs/en/guide/task/emr-serverless.md @@ -0,0 +1,144 @@ +# Amazon EMR Serverless + +## Overview + +Amazon EMR Serverless task type, for submitting and monitoring job runs on [Amazon EMR Serverless](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/emr-serverless.html) applications. +Unlike traditional EMR on EC2, EMR Serverless requires no cluster infrastructure management and automatically scales compute resources on demand, suitable for Spark and Hive workloads. + +Using [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) in the background code, to transfer JSON parameters to a [StartJobRunRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/emrserverless/model/StartJobRunRequest.html) object +and submit it to AWS via the [StartJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_StartJobRun.html), then poll job status via the [GetJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_GetJobRun.html) until completion. + +## Create Task + +- Click `Project Management -> Project Name -> Workflow Definition`, click the `Create Workflow` button to enter the DAG editing page. +- Drag `AmazonEMRServerless` task from the toolbar to the artboard to complete the creation. + +## Task Parameters + +[//]: # (TODO: use the commented anchor below once our website template supports this syntax) +[//]: # (- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md#default-task-parameters) `Default Task Parameters` section for default parameters.) + +- Please refer to [DolphinScheduler Task Parameters Appendix](appendix.md) `Default Task Parameters` section for default parameters. + +| **Parameter** | **Description** | +|-------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Application Id | EMR Serverless application ID (e.g. `00fkht2eodujab09`), obtainable from the [EMR Serverless Console](https://console.aws.amazon.com/emr/home#/serverless) | +| Execution Role Arn | ARN of the IAM role for job execution (e.g. `arn:aws:iam::123456789012:role/EMRServerlessRole`), this role needs permissions to access S3, Glue, and other services | +| Job Name | Job name (optional), used to identify the job in the EMR Serverless console | +| StartJobRunRequest JSON | JSON corresponding to the `JobDriver` and `ConfigurationOverrides` portions of the [StartJobRunRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/emrserverless/model/StartJobRunRequest.html), see examples below. **Note**: `ApplicationId` and `ExecutionRoleArn` do not need to be included in the JSON as they are automatically injected from the form parameters above | + +![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_serverless_create.png) + +## Task Example + +### Submit a Spark Job + +This example shows how to create an `EMR_SERVERLESS` task node to submit a Spark job to an EMR Serverless application. + +StartJobRunRequest JSON example (Spark): + +```json +{ + "JobDriver": { + "SparkSubmit": { + "EntryPoint": "s3://my-bucket/scripts/my-spark-job.jar", + "EntryPointArguments": [ + "s3://my-bucket/input/", + "s3://my-bucket/output/" + ], + "SparkSubmitParameters": "--class com.example.MySparkApp --conf spark.executor.cores=4 --conf spark.executor.memory=8g --conf spark.executor.instances=10" + } + }, + "ConfigurationOverrides": { + "MonitoringConfiguration": { + "S3MonitoringConfiguration": { + "LogUri": "s3://my-bucket/emr-serverless-logs/" + } + } + } +} +``` + +### Submit a Hive Job + +This example shows how to create an `EMR_SERVERLESS` task node to submit a Hive query job. + +StartJobRunRequest JSON example (Hive): + +```json +{ + "JobDriver": { + "HiveSQL": { + "Query": "s3://my-bucket/scripts/my-hive-query.sql", + "Parameters": "--hiveconf hive.exec.dynamic.partition=true --hiveconf hive.exec.dynamic.partition.mode=nonstrict" + } + }, + "ConfigurationOverrides": { + "MonitoringConfiguration": { + "S3MonitoringConfiguration": { + "LogUri": "s3://my-bucket/emr-serverless-logs/" + } + }, + "ApplicationConfiguration": [ + { + "Classification": "hive-site", + "Properties": { + "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + } + } + ] + } +} +``` + +## AWS Authentication Configuration + +The EMR Serverless task reads AWS credentials from the DolphinScheduler `aws.yaml` configuration file, under the `aws.emr` section at `conf/aws.yaml`. + +### Using IAM Role (Recommended) + +If the DolphinScheduler Worker node runs on an EC2 instance with an attached IAM Role: + +```yaml +aws: + emr: + credentials.provider.type: InstanceProfileCredentialsProvider + region: us-east-1 +``` + +### Using Access Key + +If you need to authenticate using AK/SK: + +```yaml +aws: + emr: + credentials.provider.type: AWSStaticCredentialsProvider + access.key.id: your-access-key-id + access.key.secret: your-secret-access-key + region: us-east-1 +``` + +> **Note**: The `aws.emr` section configuration is shared by both EMR on EC2 and EMR Serverless task types. + +## Job State Transitions + +After an EMR Serverless job is submitted, DolphinScheduler polls the job status every 10 seconds: + +``` +SUBMITTED → PENDING → SCHEDULED → RUNNING → SUCCESS + → FAILED + → CANCELLED +``` + +- When a job reaches `SUCCESS` state, the task is marked as successful +- When a job reaches `FAILED` or `CANCELLED` state, the task is marked as failed +- If a DolphinScheduler task is killed, it automatically calls the [CancelJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_CancelJobRun.html) to cancel the running job + +## Notice + +- The **Application Id** must correspond to a pre-existing EMR Serverless application (created via the AWS Console or API) in `STARTED` or `CREATED` state +- The **Execution Role** requires the following minimum permissions: `emr-serverless:StartJobRun`, `emr-serverless:GetJobRun`, `emr-serverless:CancelJobRun`, plus S3, Glue and other data access permissions required by the job +- `StartJobRunRequest JSON` should NOT include `ApplicationId` or `ExecutionRoleArn` fields — they are automatically injected from the form parameters +- EMR Serverless task supports failover: when a Worker node fails, a new Worker can recover tracking of running jobs through `appIds` (the `jobRunId`) + diff --git a/docs/docs/zh/guide/task/emr-serverless.md b/docs/docs/zh/guide/task/emr-serverless.md new file mode 100644 index 000000000000..92ce54a84ade --- /dev/null +++ b/docs/docs/zh/guide/task/emr-serverless.md @@ -0,0 +1,144 @@ +# Amazon EMR Serverless + +## 综述 + +Amazon EMR Serverless 任务类型,用于向 [Amazon EMR Serverless](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/emr-serverless.html) 应用程序提交并监控作业运行。 +与传统的 EMR on EC2 不同,EMR Serverless 无需管理集群基础设施,按需自动扩缩容计算资源,适用于 Spark 和 Hive 工作负载。 + +后台使用 [aws-java-sdk](https://aws.amazon.com/cn/sdk-for-java/) 将 JSON 参数转换为 [StartJobRunRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/emrserverless/model/StartJobRunRequest.html) 对象, +通过 [StartJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_StartJobRun.html) 提交到 AWS,并通过 [GetJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_GetJobRun.html) 轮询作业状态直到完成。 + +## 创建任务 + +- 点击 `项目管理 -> 项目名称 -> 工作流定义`,点击 `创建工作流` 按钮进入 DAG 编辑页面。 +- 从工具栏中拖拽 `AmazonEMRServerless` 任务到画布中完成创建。 + +## 任务参数 + +[//]: # (TODO: use the commented anchor below once our website template supports this syntax) +[//]: # (- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md#默认任务参数)`默认任务参数`一栏。) + +- 默认参数说明请参考[DolphinScheduler任务参数附录](appendix.md)`默认任务参数`一栏。 + +| **任务参数** | **描述** | +|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Application Id | EMR Serverless 应用程序 ID(格式如 `00fkht2eodujab09`),可在 [EMR Serverless 控制台](https://console.aws.amazon.com/emr/home#/serverless) 获取 | +| Execution Role Arn | 作业执行 IAM 角色的 ARN(格式如 `arn:aws:iam::123456789012:role/EMRServerlessRole`),该角色需要有访问 S3、Glue 等服务的权限 | +| Job Name | 作业名称(可选),用于在 EMR Serverless 控制台中标识作业 | +| StartJobRunRequest JSON | [StartJobRunRequest](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/emrserverless/model/StartJobRunRequest.html) 中 `JobDriver` 和 `ConfigurationOverrides` 部分对应的 JSON,详细定义见下方示例。**注意**:`ApplicationId` 和 `ExecutionRoleArn` 无需在 JSON 中重复填写,系统会自动从上方参数注入 | + +![RUN_JOB_FLOW](../../../../img/tasks/demo/emr_serverless_create.png) + +## 任务样例 + +### 提交 Spark 作业 + +该样例展示了如何创建 `EMR_SERVERLESS` 任务节点来提交一个 Spark 作业到 EMR Serverless 应用程序。 + +StartJobRunRequest JSON 参数样例(Spark): + +```json +{ + "JobDriver": { + "SparkSubmit": { + "EntryPoint": "s3://my-bucket/scripts/my-spark-job.jar", + "EntryPointArguments": [ + "s3://my-bucket/input/", + "s3://my-bucket/output/" + ], + "SparkSubmitParameters": "--class com.example.MySparkApp --conf spark.executor.cores=4 --conf spark.executor.memory=8g --conf spark.executor.instances=10" + } + }, + "ConfigurationOverrides": { + "MonitoringConfiguration": { + "S3MonitoringConfiguration": { + "LogUri": "s3://my-bucket/emr-serverless-logs/" + } + } + } +} +``` + +### 提交 Hive 作业 + +该样例展示了如何创建 `EMR_SERVERLESS` 任务节点来提交一个 Hive 查询作业。 + +StartJobRunRequest JSON 参数样例(Hive): + +```json +{ + "JobDriver": { + "HiveSQL": { + "Query": "s3://my-bucket/scripts/my-hive-query.sql", + "Parameters": "--hiveconf hive.exec.dynamic.partition=true --hiveconf hive.exec.dynamic.partition.mode=nonstrict" + } + }, + "ConfigurationOverrides": { + "MonitoringConfiguration": { + "S3MonitoringConfiguration": { + "LogUri": "s3://my-bucket/emr-serverless-logs/" + } + }, + "ApplicationConfiguration": [ + { + "Classification": "hive-site", + "Properties": { + "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" + } + } + ] + } +} +``` + +## AWS 认证配置 + +EMR Serverless 任务通过 DolphinScheduler 的 `aws.yaml` 配置文件读取 AWS 认证信息,配置路径为 `conf/aws.yaml` 中的 `aws.emr` 段。 + +### 使用 IAM Role(推荐) + +如果 DolphinScheduler Worker 节点运行在 EC2 实例上并已绑定 IAM Role,配置如下: + +```yaml +aws: + emr: + credentials.provider.type: InstanceProfileCredentialsProvider + region: us-east-1 +``` + +### 使用 Access Key + +如果需要使用 AK/SK 方式认证: + +```yaml +aws: + emr: + credentials.provider.type: AWSStaticCredentialsProvider + access.key.id: your-access-key-id + access.key.secret: your-secret-access-key + region: us-east-1 +``` + +> **注意**:`aws.emr` 段的配置同时被 EMR on EC2 和 EMR Serverless 任务类型共享。 + +## 作业状态流转 + +EMR Serverless 作业提交后,DolphinScheduler 会每 10 秒轮询一次作业状态: + +``` +SUBMITTED → PENDING → SCHEDULED → RUNNING → SUCCESS + → FAILED + → CANCELLED +``` + +- 作业进入 `SUCCESS` 状态时,任务标记为成功 +- 作业进入 `FAILED` 或 `CANCELLED` 状态时,任务标记为失败 +- 如果 DolphinScheduler 任务被终止,会自动调用 [CancelJobRun API](https://docs.aws.amazon.com/emr-serverless/latest/APIReference/API_CancelJobRun.html) 取消正在运行的作业 + +## 注意事项 + +- **Application Id** 对应的 EMR Serverless 应用程序需要预先在 AWS 控制台或通过 API 创建,并确保处于 `STARTED` 或 `CREATED` 状态 +- **Execution Role** 需要有以下最小权限:`emr-serverless:StartJobRun`、`emr-serverless:GetJobRun`、`emr-serverless:CancelJobRun`,以及作业所需的 S3、Glue 等数据访问权限 +- `StartJobRunRequest JSON` 中无需填写 `ApplicationId` 和 `ExecutionRoleArn` 字段,系统会自动从表单参数注入 +- EMR Serverless 任务支持故障转移(Failover):当 Worker 节点发生故障时,新的 Worker 可以通过 `appIds`(即 `jobRunId`)恢复对正在运行作业的跟踪 + diff --git a/docs/img/tasks/demo/emr_serverless_create.png b/docs/img/tasks/demo/emr_serverless_create.png new file mode 100644 index 000000000000..ba13ba153e27 Binary files /dev/null and b/docs/img/tasks/demo/emr_serverless_create.png differ diff --git a/dolphinscheduler-api/src/main/resources/task-type-config.yaml b/dolphinscheduler-api/src/main/resources/task-type-config.yaml index 51bbae7e9abd..e95d4190fc60 100644 --- a/dolphinscheduler-api/src/main/resources/task-type-config.yaml +++ b/dolphinscheduler-api/src/main/resources/task-type-config.yaml @@ -33,6 +33,7 @@ task: - 'REMOTESHELL' cloud: - 'EMR' + - 'EMR_SERVERLESS' - 'K8S' - 'DMS' - 'DATA_FACTORY' diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index 21a1e7d96152..22f5e23a3be2 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -752,6 +752,11 @@ aws-java-sdk-emr ${aws-sdk.version} + + com.amazonaws + aws-java-sdk-emrserverless + ${aws-sdk.version} + com.amazonaws aws-java-sdk-s3 diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml index 8b616170b395..f3e6f8855568 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-all/pom.xml @@ -112,6 +112,12 @@ ${project.version} + + org.apache.dolphinscheduler + dolphinscheduler-task-emr-serverless + ${project.version} + + org.apache.dolphinscheduler dolphinscheduler-task-zeppelin diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/pom.xml b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/pom.xml new file mode 100644 index 000000000000..81a98a92cf25 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + org.apache.dolphinscheduler + dolphinscheduler-task-plugin + dev-SNAPSHOT + + + dolphinscheduler-task-emr-serverless + jar + + + task.emr.serverless + + + + + org.apache.dolphinscheduler + dolphinscheduler-task-api + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-aws-authentication + provided + + + + org.apache.dolphinscheduler + dolphinscheduler-common + ${project.version} + provided + + + + com.amazonaws + aws-java-sdk-emrserverless + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + shade + + package + + + + + + + diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessParameters.java new file mode 100644 index 000000000000..bda8faf49bf6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessParameters.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@EqualsAndHashCode(callSuper = true) +public class EmrServerlessParameters extends AbstractParameters { + + /** + * EMR Serverless Application ID (required). + * The application must be in STARTED or CREATED state. + */ + private String applicationId; + + /** + * IAM execution role ARN for the job run (required). + */ + private String executionRoleArn; + + /** + * Optional job name. If empty, defaults to the task name. + */ + private String jobName; + + /** + * StartJobRun request JSON defining jobDriver and configurationOverrides. + * The applicationId and executionRoleArn fields in this JSON will be overridden + * by the top-level parameters above. + * + * @see API_StartJobRun + */ + private String startJobRunRequestJson; + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(applicationId) + && StringUtils.isNotEmpty(executionRoleArn) + && StringUtils.isNotEmpty(startJobRunRequestJson); + } + + @Override + public List getResourceFilesList() { + return Collections.emptyList(); + } + + @Override + public String toString() { + return "EmrServerlessParameters{" + + "applicationId='" + applicationId + '\'' + + ", executionRoleArn='" + executionRoleArn + '\'' + + ", jobName='" + jobName + '\'' + + ", startJobRunRequestJson='" + startJobRunRequestJson + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTask.java new file mode 100644 index 000000000000..3ed3862823de --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTask.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL; +import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS; + +import org.apache.dolphinscheduler.authentication.aws.AWSCredentialsProviderFactor; +import org.apache.dolphinscheduler.authentication.aws.AwsConfigurationKeys; +import org.apache.dolphinscheduler.common.constants.SystemConstants; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +import com.amazonaws.SdkBaseException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder; +import com.amazonaws.services.emrserverless.model.CancelJobRunRequest; +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunRequest; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.StartJobRunRequest; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.google.common.collect.Sets; + +/** + * Amazon EMR Serverless Task. + *

+ * Submits a job run to an EMR Serverless application and tracks it until completion. + * Supports Spark and Hive job types. + *

+ * + * @since dev-SNAPSHOT + */ +@Slf4j +public class EmrServerlessTask extends AbstractRemoteTask { + + /** + * EMR Serverless job run states that indicate the job is still in progress. + */ + private static final HashSet WAITING_STATES = Sets.newHashSet( + "SUBMITTED", "PENDING", "SCHEDULED", "RUNNING"); + + /** + * ObjectMapper configured for AWS SDK request/response deserialization. + */ + static final ObjectMapper objectMapper = new ObjectMapper() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) + .setTimeZone(SystemConstants.DEFAULT_TIME_ZONE) + .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()); + + private final TaskExecutionContext taskExecutionContext; + + private EmrServerlessParameters emrServerlessParameters; + + private AWSEMRServerless emrServerlessClient; + + /** + * applicationId from parameters + */ + private String applicationId; + + /** + * jobRunId returned by StartJobRun or recovered from appIds + */ + private String jobRunId; + + protected EmrServerlessTask(TaskExecutionContext taskExecutionContext) { + super(taskExecutionContext); + this.taskExecutionContext = taskExecutionContext; + } + + @Override + public void init() { + final String taskParams = taskExecutionContext.getTaskParams(); + emrServerlessParameters = JSONUtils.parseObject(taskParams, EmrServerlessParameters.class); + log.info("Initialize EMR Serverless task params: {}", JSONUtils.toPrettyJsonString(taskParams)); + + if (emrServerlessParameters == null || !emrServerlessParameters.checkParameters()) { + throw new EmrServerlessTaskException("EMR Serverless task params are not valid"); + } + + applicationId = emrServerlessParameters.getApplicationId(); + emrServerlessClient = createEmrServerlessClient(); + } + + @Override + public void submitApplication() throws TaskException { + try { + StartJobRunRequest request = buildStartJobRunRequest(); + + log.info("Submitting EMR Serverless job run to application: {}", applicationId); + StartJobRunResult result = emrServerlessClient.startJobRun(request); + + jobRunId = result.getJobRunId(); + // Store applicationId:jobRunId for failover recovery + setAppIds(applicationId + ":" + jobRunId); + log.info("Successfully submitted EMR Serverless job run, jobRunId: {}", jobRunId); + + } catch (EmrServerlessTaskException | SdkBaseException e) { + log.error("EMR Serverless task submit failed", e); + throw new TaskException("EMR Serverless task submit failed", e); + } + } + + @Override + public void trackApplicationStatus() throws TaskException { + try { + // Recover applicationId and jobRunId from appIds if needed (failover case) + if (StringUtils.isEmpty(jobRunId) && StringUtils.isNotEmpty(getAppIds())) { + String[] parts = getAppIds().split(":"); + if (parts.length == 2) { + applicationId = parts[0]; + jobRunId = parts[1]; + log.info("Recovered EMR Serverless job from appIds - applicationId: {}, jobRunId: {}", + applicationId, jobRunId); + } + } + + if (StringUtils.isEmpty(jobRunId)) { + throw new EmrServerlessTaskException("jobRunId is empty, cannot track application status"); + } + + String currentState = getJobRunState(); + while (WAITING_STATES.contains(currentState)) { + TimeUnit.SECONDS.sleep(10); + currentState = getJobRunState(); + } + + final int exitCode = mapStateToExitCode(currentState); + setExitStatusCode(exitCode); + log.info("EMR Serverless job run [{}] finished with state: {}, exitCode: {}", + jobRunId, currentState, exitCode); + + } catch (EmrServerlessTaskException | SdkBaseException e) { + log.error("EMR Serverless task tracking failed", e); + setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TaskException("EMR Serverless task tracking interrupted", e); + } + } + + @Override + public void cancelApplication() throws TaskException { + if (StringUtils.isEmpty(jobRunId)) { + log.warn("jobRunId is empty, skip cancel"); + return; + } + log.info("Cancelling EMR Serverless job run, applicationId: {}, jobRunId: {}", applicationId, jobRunId); + try { + CancelJobRunRequest request = new CancelJobRunRequest() + .withApplicationId(applicationId) + .withJobRunId(jobRunId); + CancelJobRunResult result = emrServerlessClient.cancelJobRun(request); + log.info("Cancel job run result: {}", result); + } catch (SdkBaseException e) { + throw new TaskException("Failed to cancel EMR Serverless job run", e); + } + } + + @Override + public List getApplicationIds() throws TaskException { + return Collections.emptyList(); + } + + @Override + public AbstractParameters getParameters() { + return emrServerlessParameters; + } + + /** + * Build StartJobRunRequest from parameters and user-provided JSON. + */ + private StartJobRunRequest buildStartJobRunRequest() { + String startJobRunRequestJson; + try { + startJobRunRequestJson = ParameterUtils.convertParameterPlaceholders( + emrServerlessParameters.getStartJobRunRequestJson(), + ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())); + } catch (Exception e) { + throw new EmrServerlessTaskException("Failed to resolve parameter placeholders", e); + } + + StartJobRunRequest request; + try { + request = objectMapper.readValue(startJobRunRequestJson, StartJobRunRequest.class); + } catch (JsonProcessingException e) { + throw new EmrServerlessTaskException( + "Cannot parse StartJobRunRequest from JSON: " + startJobRunRequestJson, e); + } + + // Override applicationId and executionRoleArn from top-level parameters + request.setApplicationId(applicationId); + request.setExecutionRoleArn(emrServerlessParameters.getExecutionRoleArn()); + + // Set job name if provided + if (StringUtils.isNotEmpty(emrServerlessParameters.getJobName())) { + request.setName(emrServerlessParameters.getJobName()); + } else { + request.setName(taskExecutionContext.getTaskName()); + } + + // Set client token for idempotency + request.setClientToken(taskExecutionContext.getTaskInstanceId() + "-" + System.currentTimeMillis()); + + return request; + } + + /** + * Get the current state of the job run. + */ + private String getJobRunState() { + GetJobRunRequest request = new GetJobRunRequest() + .withApplicationId(applicationId) + .withJobRunId(jobRunId); + GetJobRunResult result = emrServerlessClient.getJobRun(request); + + if (result == null || result.getJobRun() == null) { + throw new EmrServerlessTaskException("Failed to get job run status"); + } + + JobRun jobRun = result.getJobRun(); + String state = jobRun.getState(); + log.info("EMR Serverless job run [applicationId:{}, jobRunId:{}] state: {}", applicationId, jobRunId, state); + return state; + } + + /** + * Map EMR Serverless job run final state to DolphinScheduler exit code. + */ + private int mapStateToExitCode(String state) { + if (state == null) { + return TaskConstants.EXIT_CODE_FAILURE; + } + switch (state) { + case "SUCCESS": + return TaskConstants.EXIT_CODE_SUCCESS; + case "CANCELLED": + return TaskConstants.EXIT_CODE_KILL; + case "FAILED": + default: + return TaskConstants.EXIT_CODE_FAILURE; + } + } + + /** + * Create EMR Serverless client. + * Strategy: try aws.emr.* config first, fallback to DefaultAWSCredentialsProviderChain. + */ + protected AWSEMRServerless createEmrServerlessClient() { + Map awsProperties = PropertyUtils.getByPrefix("aws.emr.", ""); + AWSEMRServerlessClientBuilder builder = AWSEMRServerlessClientBuilder.standard(); + + AWSCredentialsProvider credentialsProvider; + try { + credentialsProvider = AWSCredentialsProviderFactor.credentialsProvider(awsProperties); + log.info("Using AWS credentials from aws.emr.* configuration"); + } catch (Exception e) { + log.info("No valid aws.emr.* credentials config found, falling back to DefaultAWSCredentialsProviderChain"); + credentialsProvider = DefaultAWSCredentialsProviderChain.getInstance(); + } + builder.withCredentials(credentialsProvider); + + String region = awsProperties.get(AwsConfigurationKeys.AWS_REGION); + + // Note: unlike S3, EMR Serverless has no local emulator, so we ignore + // the endpoint configuration (which may point to a local MinIO/S3 mock) + // and always use the standard AWS EMR Serverless endpoint resolved by region. + if (StringUtils.isNotEmpty(region)) { + builder.withRegion(region); + } + + return builder.build(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannel.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannel.java new file mode 100644 index 000000000000..a63cea6bbb6a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannel.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; + +public class EmrServerlessTaskChannel implements TaskChannel { + + @Override + public AbstractTask createTask(TaskExecutionContext taskRequest) { + return new EmrServerlessTask(taskRequest); + } + + @Override + public AbstractParameters parseParameters(String taskParams) { + return JSONUtils.parseObject(taskParams, EmrServerlessParameters.class); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannelFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannelFactory.java new file mode 100644 index 000000000000..74e4b0e7ccab --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskChannelFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; +import org.apache.dolphinscheduler.plugin.task.api.TaskChannelFactory; + +import com.google.auto.service.AutoService; + +@AutoService(TaskChannelFactory.class) +public class EmrServerlessTaskChannelFactory implements TaskChannelFactory { + + @Override + public String getName() { + return "EMR_SERVERLESS"; + } + + @Override + public TaskChannel create() { + return new EmrServerlessTaskChannel(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskException.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskException.java new file mode 100644 index 000000000000..ea46565b6981 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/main/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import org.apache.dolphinscheduler.plugin.task.api.TaskException; + +public class EmrServerlessTaskException extends TaskException { + + public EmrServerlessTaskException(String message) { + super(message); + } + + public EmrServerlessTaskException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskTest.java new file mode 100644 index 000000000000..eee1b77c13e6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/java/org/apache/dolphinscheduler/plugin/task/emrserverless/EmrServerlessTaskTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.emrserverless; + +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import static org.mockito.ArgumentMatchers.any; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; + +import org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.amazonaws.services.emrserverless.AWSEMRServerless; +import com.amazonaws.services.emrserverless.model.AWSEMRServerlessException; +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import com.amazonaws.services.emrserverless.model.JobRun; +import com.amazonaws.services.emrserverless.model.StartJobRunResult; + +@ExtendWith(MockitoExtension.class) +public class EmrServerlessTaskTest { + + private static final String APPLICATION_ID = "00abcdefgh123456"; + private static final String JOB_RUN_ID = "00abcdefgh123456-jobrun-001"; + private static final String EXECUTION_ROLE_ARN = "arn:aws:iam::123456789012:role/EMRServerlessRole"; + + private EmrServerlessTask emrServerlessTask; + private AWSEMRServerless emrServerlessClient; + + private final TaskCallBack taskCallBack = new TaskCallBack() { + + @Override + public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { + } + + @Override + public void updateTaskInstanceInfo(int taskInstanceId) { + } + }; + + @BeforeEach + public void before() throws Exception { + String taskParams = buildEmrServerlessTaskParameters(); + TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); + Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams); + Mockito.lenient().when(taskExecutionContext.getTaskName()).thenReturn("test-emr-serverless-task"); + Mockito.lenient().when(taskExecutionContext.getTaskInstanceId()).thenReturn(1); + + emrServerlessTask = Mockito.spy(new EmrServerlessTask(taskExecutionContext)); + + // mock emrServerlessClient + emrServerlessClient = Mockito.mock(AWSEMRServerless.class); + + // mock startJobRun + StartJobRunResult startJobRunResult = Mockito.mock(StartJobRunResult.class); + Mockito.lenient().when(emrServerlessClient.startJobRun(any())).thenReturn(startJobRunResult); + Mockito.lenient().when(startJobRunResult.getJobRunId()).thenReturn(JOB_RUN_ID); + + // inject mock client + Mockito.doReturn(emrServerlessClient).when(emrServerlessTask).createEmrServerlessClient(); + + emrServerlessTask.init(); + } + + @Test + public void testHandleSuccess() throws Exception { + // Job goes: SUBMITTED -> RUNNING -> SUCCESS + mockJobRunStates("SUBMITTED", "RUNNING", "SUCCESS"); + + emrServerlessTask.handle(taskCallBack); + Assertions.assertEquals(EXIT_CODE_SUCCESS, emrServerlessTask.getExitStatusCode()); + } + + @Test + public void testHandleFailed() throws Exception { + // Job goes: SUBMITTED -> RUNNING -> FAILED + mockJobRunStates("SUBMITTED", "RUNNING", "FAILED"); + + emrServerlessTask.handle(taskCallBack); + Assertions.assertEquals(EXIT_CODE_FAILURE, emrServerlessTask.getExitStatusCode()); + } + + @Test + public void testHandleCancelled() throws Exception { + // Job goes: SUBMITTED -> RUNNING -> CANCELLED + mockJobRunStates("SUBMITTED", "RUNNING", "CANCELLED"); + + emrServerlessTask.handle(taskCallBack); + Assertions.assertEquals(EXIT_CODE_KILL, emrServerlessTask.getExitStatusCode()); + } + + @Test + public void testHandleFullLifecycle() throws Exception { + // Job goes through all intermediate states: SUBMITTED -> PENDING -> SCHEDULED -> RUNNING -> SUCCESS + mockJobRunStates("SUBMITTED", "PENDING", "SCHEDULED", "RUNNING", "SUCCESS"); + + emrServerlessTask.handle(taskCallBack); + Assertions.assertEquals(EXIT_CODE_SUCCESS, emrServerlessTask.getExitStatusCode()); + } + + @Test + public void testSubmitError() throws Exception { + Mockito.when(emrServerlessClient.startJobRun(any())) + .thenThrow(new AWSEMRServerlessException("Access denied")); + + Assertions.assertThrows(TaskException.class, () -> { + emrServerlessTask.handle(taskCallBack); + }); + } + + @Test + public void testGetJobRunReturnsNull() throws Exception { + // First call for submit (need a valid startJobRun response) + StartJobRunResult startResult = Mockito.mock(StartJobRunResult.class); + Mockito.when(emrServerlessClient.startJobRun(any())).thenReturn(startResult); + Mockito.when(startResult.getJobRunId()).thenReturn(JOB_RUN_ID); + + // getJobRun returns null + Mockito.when(emrServerlessClient.getJobRun(any())).thenReturn(null); + + emrServerlessTask.handle(taskCallBack); + Assertions.assertEquals(EXIT_CODE_FAILURE, emrServerlessTask.getExitStatusCode()); + } + + @Test + public void testCancelApplication() throws Exception { + // Submit first so we have a jobRunId + mockJobRunStates("SUBMITTED", "RUNNING", "SUCCESS"); + emrServerlessTask.handle(taskCallBack); + + // Now test cancel + CancelJobRunResult cancelResult = Mockito.mock(CancelJobRunResult.class); + Mockito.when(emrServerlessClient.cancelJobRun(any())).thenReturn(cancelResult); + emrServerlessTask.cancelApplication(); + + Mockito.verify(emrServerlessClient).cancelJobRun(any()); + } + + @Test + public void testCancelWithEmptyJobRunId() throws Exception { + // Don't submit, so jobRunId is empty — cancel should be a no-op + emrServerlessTask.cancelApplication(); + Mockito.verify(emrServerlessClient, Mockito.never()).cancelJobRun(any()); + } + + @Test + public void testFailoverRecovery() throws Exception { + // Simulate failover: submit the job first + mockJobRunStates("SUBMITTED", "RUNNING", "SUCCESS"); + emrServerlessTask.handle(taskCallBack); + + // Now create a new task instance simulating failover + String taskParams = buildEmrServerlessTaskParameters(); + TaskExecutionContext failoverContext = Mockito.mock(TaskExecutionContext.class); + Mockito.when(failoverContext.getTaskParams()).thenReturn(taskParams); + Mockito.lenient().when(failoverContext.getTaskName()).thenReturn("test-emr-serverless-failover"); + Mockito.lenient().when(failoverContext.getTaskInstanceId()).thenReturn(2); + // Simulate that appIds was persisted from previous run + Mockito.when(failoverContext.getAppIds()).thenReturn(APPLICATION_ID + ":" + JOB_RUN_ID); + + EmrServerlessTask failoverTask = Mockito.spy(new EmrServerlessTask(failoverContext)); + Mockito.doReturn(emrServerlessClient).when(failoverTask).createEmrServerlessClient(); + + // Reset getJobRun mock for the recovery path — job already completed + mockJobRunStates("SUCCESS"); + + failoverTask.init(); + failoverTask.handle(taskCallBack); + + // Should recover and succeed without calling startJobRun again + Assertions.assertEquals(EXIT_CODE_SUCCESS, failoverTask.getExitStatusCode()); + } + + @Test + public void testParametersCheck() { + EmrServerlessParameters params = new EmrServerlessParameters(); + + // All empty — should fail + Assertions.assertFalse(params.checkParameters()); + + // Only applicationId — should fail + params.setApplicationId(APPLICATION_ID); + Assertions.assertFalse(params.checkParameters()); + + // applicationId + executionRoleArn — should fail (no JSON) + params.setExecutionRoleArn(EXECUTION_ROLE_ARN); + Assertions.assertFalse(params.checkParameters()); + + // All three — should pass + params.setStartJobRunRequestJson("{}"); + Assertions.assertTrue(params.checkParameters()); + } + + @Test + public void testInvalidJson() throws Exception { + // Build params with invalid JSON + EmrServerlessParameters params = new EmrServerlessParameters(); + params.setApplicationId(APPLICATION_ID); + params.setExecutionRoleArn(EXECUTION_ROLE_ARN); + params.setStartJobRunRequestJson("{invalid json!!!}"); + + TaskExecutionContext ctx = Mockito.mock(TaskExecutionContext.class); + Mockito.when(ctx.getTaskParams()).thenReturn(JSONUtils.toJsonString(params)); + Mockito.lenient().when(ctx.getTaskName()).thenReturn("test-bad-json"); + Mockito.lenient().when(ctx.getTaskInstanceId()).thenReturn(99); + + EmrServerlessTask badJsonTask = Mockito.spy(new EmrServerlessTask(ctx)); + Mockito.doReturn(emrServerlessClient).when(badJsonTask).createEmrServerlessClient(); + + badJsonTask.init(); + Assertions.assertThrows(TaskException.class, () -> { + badJsonTask.handle(taskCallBack); + }); + } + + // --- Helper methods --- + + private void mockJobRunStates(String... states) { + if (states.length == 0) { + return; + } + + GetJobRunResult[] results = new GetJobRunResult[states.length]; + for (int i = 0; i < states.length; i++) { + GetJobRunResult result = Mockito.mock(GetJobRunResult.class); + JobRun jobRun = Mockito.mock(JobRun.class); + Mockito.when(result.getJobRun()).thenReturn(jobRun); + Mockito.when(jobRun.getState()).thenReturn(states[i]); + results[i] = result; + } + + if (results.length == 1) { + Mockito.when(emrServerlessClient.getJobRun(any())).thenReturn(results[0]); + } else { + GetJobRunResult first = results[0]; + GetJobRunResult[] rest = new GetJobRunResult[results.length - 1]; + System.arraycopy(results, 1, rest, 0, rest.length); + Mockito.when(emrServerlessClient.getJobRun(any())).thenReturn(first, rest); + } + } + + private String buildEmrServerlessTaskParameters() { + EmrServerlessParameters params = new EmrServerlessParameters(); + params.setApplicationId(APPLICATION_ID); + params.setExecutionRoleArn(EXECUTION_ROLE_ARN); + + String startJobRunRequestJson; + try (InputStream is = this.getClass().getResourceAsStream("StartJobRunRequest.json")) { + assert is != null; + startJobRunRequestJson = IOUtils.toString(is, StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + params.setStartJobRunRequestJson(startJobRunRequestJson); + + return JSONUtils.toJsonString(params); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/resources/org/apache/dolphinscheduler/plugin/task/emrserverless/StartJobRunRequest.json b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/resources/org/apache/dolphinscheduler/plugin/task/emrserverless/StartJobRunRequest.json new file mode 100644 index 000000000000..36386900a83a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr-serverless/src/test/resources/org/apache/dolphinscheduler/plugin/task/emrserverless/StartJobRunRequest.json @@ -0,0 +1,16 @@ +{ + "JobDriver": { + "SparkSubmit": { + "EntryPoint": "s3://my-bucket/scripts/spark-etl.py", + "EntryPointArguments": ["--input", "s3://data/raw", "--output", "s3://data/processed"], + "SparkSubmitParameters": "--conf spark.executor.cores=4 --conf spark.executor.memory=16g --conf spark.executor.instances=10" + } + }, + "ConfigurationOverrides": { + "MonitoringConfiguration": { + "S3MonitoringConfiguration": { + "LogUri": "s3://my-bucket/emr-serverless-logs/" + } + } + } +} diff --git a/dolphinscheduler-task-plugin/pom.xml b/dolphinscheduler-task-plugin/pom.xml index 6f3d727d59b9..830650f50f55 100644 --- a/dolphinscheduler-task-plugin/pom.xml +++ b/dolphinscheduler-task-plugin/pom.xml @@ -62,6 +62,7 @@ dolphinscheduler-task-datafactory dolphinscheduler-task-remoteshell dolphinscheduler-task-aliyunserverlessspark + dolphinscheduler-task-emr-serverless diff --git a/dolphinscheduler-ui/public/images/task-icons/emr_serverless.png b/dolphinscheduler-ui/public/images/task-icons/emr_serverless.png new file mode 100644 index 000000000000..40fca7c961c8 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/emr_serverless.png differ diff --git a/dolphinscheduler-ui/public/images/task-icons/emr_serverless_hover.png b/dolphinscheduler-ui/public/images/task-icons/emr_serverless_hover.png new file mode 100644 index 000000000000..59acf7a114d1 Binary files /dev/null and b/dolphinscheduler-ui/public/images/task-icons/emr_serverless_hover.png differ diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts b/dolphinscheduler-ui/src/locales/en_US/project.ts index 59edc7dfeaed..e45e6bc50253 100644 --- a/dolphinscheduler-ui/src/locales/en_US/project.ts +++ b/dolphinscheduler-ui/src/locales/en_US/project.ts @@ -741,6 +741,18 @@ export default { emr_flow_define_json_tips: 'Please enter the definition of the job flow.', emr_steps_define_json: 'stepsDefineJson', emr_steps_define_json_tips: 'Please enter the definition of the emr step.', + emr_serverless_application_id: 'Application ID', + emr_serverless_application_id_tips: + 'Please enter the EMR Serverless Application ID', + emr_serverless_execution_role_arn: 'Execution Role ARN', + emr_serverless_execution_role_arn_tips: + 'Please enter the IAM execution role ARN', + emr_serverless_job_name: 'Job Name', + emr_serverless_job_name_tips: + 'Optional. Defaults to the task name if empty', + emr_serverless_start_job_run_json: 'StartJobRun JSON', + emr_serverless_start_job_run_json_tips: + 'Please enter the StartJobRun request JSON (jobDriver, configurationOverrides, etc.)', zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id_tips: 'Please enter the note id of your zeppelin note', zeppelin_paragraph_id: 'zeppelinParagraphId', diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts b/dolphinscheduler-ui/src/locales/zh_CN/project.ts index e6b08093c568..11a35461f2bc 100644 --- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts +++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts @@ -723,6 +723,15 @@ export default { emr_flow_define_json_tips: '请输入工作流定义', emr_steps_define_json: 'stepsDefineJson', emr_steps_define_json_tips: '请输入EMR步骤定义', + emr_serverless_application_id: 'Application ID', + emr_serverless_application_id_tips: '请输入 EMR Serverless Application ID', + emr_serverless_execution_role_arn: '执行角色 ARN', + emr_serverless_execution_role_arn_tips: '请输入 IAM 执行角色 ARN', + emr_serverless_job_name: '作业名称', + emr_serverless_job_name_tips: '可选,为空时默认使用任务名称', + emr_serverless_start_job_run_json: 'StartJobRun JSON', + emr_serverless_start_job_run_json_tips: + '请输入 StartJobRun 请求 JSON(jobDriver、configurationOverrides 等)', zeppelin_note_id: 'zeppelinNoteId', zeppelin_note_id_tips: '请输入zeppelin note id', zeppelin_paragraph_id: 'zeppelinParagraphId', diff --git a/dolphinscheduler-ui/src/store/project/task-type.ts b/dolphinscheduler-ui/src/store/project/task-type.ts index b6de8e6e5a23..597340750ae3 100644 --- a/dolphinscheduler-ui/src/store/project/task-type.ts +++ b/dolphinscheduler-ui/src/store/project/task-type.ts @@ -81,6 +81,10 @@ export const TASK_TYPES_MAP = { alias: 'AmazonEMR', helperLinkDisable: true }, + EMR_SERVERLESS: { + alias: 'AmazonEMRServerless', + helperLinkDisable: true + }, ZEPPELIN: { alias: 'ZEPPELIN', helperLinkDisable: true diff --git a/dolphinscheduler-ui/src/store/project/types.ts b/dolphinscheduler-ui/src/store/project/types.ts index ba7773fa5b14..a58135b3e741 100644 --- a/dolphinscheduler-ui/src/store/project/types.ts +++ b/dolphinscheduler-ui/src/store/project/types.ts @@ -38,6 +38,7 @@ type TaskType = | 'SWITCH' | 'SEATUNNEL' | 'EMR' + | 'EMR_SERVERLESS' | 'ZEPPELIN' | 'K8S' | 'JUPYTER' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts index 43c1ed66d138..da4cce0763e5 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/index.ts @@ -66,6 +66,7 @@ export { useDataX } from './use-datax' export { useConditions } from './use-conditions' export { useDependent } from './use-dependent' export { useEmr } from './use-emr' +export { useEmrServerless } from './use-emr-serverless' export { useZeppelin } from './use-zeppelin' export { useNamespace } from './use-namespace' export { useK8s } from './use-k8s' diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr-serverless.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr-serverless.ts new file mode 100644 index 000000000000..ec164275daff --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/fields/use-emr-serverless.ts @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { useI18n } from 'vue-i18n' +import { useCustomParams } from '.' +import type { IJsonItem } from '../types' + +export function useEmrServerless(model: { [field: string]: any }): IJsonItem[] { + const { t } = useI18n() + + return [ + { + type: 'input', + field: 'applicationId', + span: 24, + name: t('project.node.emr_serverless_application_id'), + props: { + placeholder: t('project.node.emr_serverless_application_id_tips') + }, + validate: { + required: true, + trigger: ['input', 'blur'], + message: t('project.node.emr_serverless_application_id_tips') + } + }, + { + type: 'input', + field: 'executionRoleArn', + span: 24, + name: t('project.node.emr_serverless_execution_role_arn'), + props: { + placeholder: t('project.node.emr_serverless_execution_role_arn_tips') + }, + validate: { + required: true, + trigger: ['input', 'blur'], + message: t('project.node.emr_serverless_execution_role_arn_tips') + } + }, + { + type: 'input', + field: 'jobName', + span: 24, + name: t('project.node.emr_serverless_job_name'), + props: { + placeholder: t('project.node.emr_serverless_job_name_tips') + } + }, + { + type: 'editor', + field: 'startJobRunRequestJson', + span: 24, + name: t('project.node.emr_serverless_start_job_run_json'), + props: { + language: 'json' + }, + validate: { + trigger: ['input', 'trigger'], + required: true, + message: t('project.node.emr_serverless_start_job_run_json_tips') + } + }, + ...useCustomParams({ + model, + field: 'localParams', + isSimple: true + }) + ] +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts index 8fc67906775c..b20460c90279 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/format-data.ts @@ -303,6 +303,13 @@ export function formatParams(data: INodeData): { taskParams.stepsDefineJson = data.stepsDefineJson } + if (data.taskType === 'EMR_SERVERLESS') { + taskParams.applicationId = data.applicationId + taskParams.executionRoleArn = data.executionRoleArn + taskParams.jobName = data.jobName + taskParams.startJobRunRequestJson = data.startJobRunRequestJson + } + if (data.taskType === 'ZEPPELIN') { taskParams.noteId = data.noteId taskParams.paragraphId = data.paragraphId diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts index 3db6bb96ea5b..89fa49e17b4a 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/index.ts @@ -33,6 +33,7 @@ import { useConditions } from './use-conditions' import { useDataX } from './use-datax' import { useDependent } from './use-dependent' import { useEmr } from './use-emr' +import { useEmrServerless } from './use-emr-serverless' import { useZeppelin } from './use-zeppelin' import { useK8s } from './use-k8s' import { useJupyter } from './use-jupyter' @@ -70,6 +71,7 @@ export default { DATAX: useDataX, DEPENDENT: useDependent, EMR: useEmr, + EMR_SERVERLESS: useEmrServerless, ZEPPELIN: useZeppelin, K8S: useK8s, JUPYTER: useJupyter, diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr-serverless.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr-serverless.ts new file mode 100644 index 000000000000..1b3408262718 --- /dev/null +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/tasks/use-emr-serverless.ts @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { reactive } from 'vue' +import * as Fields from '../fields/index' +import type { IJsonItem, INodeData } from '../types' +import { ITaskData } from '../types' + +export function useEmrServerless({ + projectCode, + from = 0, + readonly, + data +}: { + projectCode: number + from?: number + readonly?: boolean + data?: ITaskData +}) { + const model = reactive({ + name: '', + taskType: 'EMR_SERVERLESS', + flag: 'YES', + description: '', + timeoutFlag: false, + localParams: [], + environmentCode: null, + failRetryInterval: 1, + failRetryTimes: 0, + workerGroup: 'default', + delayTime: 0, + timeout: 30, + applicationId: '', + executionRoleArn: '', + jobName: '', + startJobRunRequestJson: '', + timeoutNotifyStrategy: ['WARN'] + } as INodeData) + + return { + json: [ + Fields.useName(from), + ...Fields.useTaskDefinition({ projectCode, from, readonly, data, model }), + Fields.useRunFlag(), + Fields.useDescription(), + Fields.useTaskPriority(), + Fields.useWorkerGroup(projectCode), + Fields.useEnvironmentName(model, !data?.id), + ...Fields.useTaskGroup(model, projectCode), + ...Fields.useFailed(), + Fields.useDelayTime(model), + ...Fields.useTimeoutAlarm(model), + ...Fields.useEmrServerless(model), + Fields.usePreTasks() + ] as IJsonItem[], + model + } +} diff --git a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts index 76dcde23ee2d..484f288ffa5e 100644 --- a/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts +++ b/dolphinscheduler-ui/src/views/projects/task/components/node/types.ts @@ -347,6 +347,9 @@ interface ITaskParams { ruleInputParameter?: IRuleParameters jobFlowDefineJson?: string stepsDefineJson?: string + applicationId?: string + executionRoleArn?: string + startJobRunRequestJson?: string zeppelinNoteId?: string zeppelinParagraphId?: string zeppelinRestEndpoint?: string diff --git a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts index ce0db268f4f8..d0eb36b277a8 100644 --- a/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts +++ b/dolphinscheduler-ui/src/views/projects/task/constants/task-type.ts @@ -32,6 +32,7 @@ export type TaskType = | 'SWITCH' | 'SEATUNNEL' | 'EMR' + | 'EMR_SERVERLESS' | 'ZEPPELIN' | 'K8S' | 'JUPYTER' @@ -114,6 +115,10 @@ export const TASK_TYPES_MAP = { alias: 'AmazonEMR', helperLinkDisable: true }, + EMR_SERVERLESS: { + alias: 'AmazonEMRServerless', + helperLinkDisable: true + }, ZEPPELIN: { alias: 'ZEPPELIN', helperLinkDisable: true