diff --git a/docs/resources/dli_datasource_connection.md b/docs/resources/dli_datasource_connection.md
new file mode 100644
index 00000000..6b815412
--- /dev/null
+++ b/docs/resources/dli_datasource_connection.md
@@ -0,0 +1,92 @@
+---
+subcategory: "Data Lake Insight (DLI)"
+---
+
+# flexibleengine_dli_datasource_connection
+
+Manages a DLI datasource **enhanced** connection resource within FlexibleEngine.
+
+## Example Usage
+
+```hcl
+ variable "name" {}
+ variable "vpc_id" {}
+ variable "subnet_id" {}
+
+ resource "flexibleengine_dli_datasource_connection" "test" {
+ name = var.name
+ vpc_id = var.vpc_id
+ subnet_id = var.subnet_id
+ }
+```
+
+## Argument Reference
+
+The following arguments are supported:
+
+* `region` - (Optional, String, ForceNew) Specifies the region in which to create the resource.
+ If omitted, the provider-level region will be used. Changing this parameter will create a new resource.
+
+* `name` - (Required, String, ForceNew) Specifies the name of the datasource connection.
+ The valid length is limited from can contain 1 to 64, only letters, digits and underscore (_) are allowed.
+ The name must start with a lowercase letter and end with a lowercase letter or digit.
+ Changing this parameter will create a new resource.
+
+* `vpc_id` - (Required, String, ForceNew) The VPC ID of the service to be connected.
+ Changing this parameter will create a new resource.
+
+* `subnet_id` - (Required, String, ForceNew) The subnet ID of the service to be connected.
+ Changing this parameter will create a new resource.
+
+* `route_table_id` - (Optional, String, ForceNew) The route table ID associated with the subnet of the service to be
+ connected. Changing this parameter will create a new resource.
+
+* `queues` - (Optional, List) List of queue names that are available for datasource connections.
+
+* `hosts` - (Optional, List) The user-defined host information. A maximum of 20,000 records are supported.
+ The [hosts](#dli_hosts) object structure is documented below.
+
+* `tags` - (Optional, Map, ForceNew) The key/value pairs to associate with the datasource connection.
+ Changing this parameter will create a new resource.
+
+
+The `hosts` block supports:
+
+* `name` - (Required, String) The user-defined host name.
+
+* `ip` - (Required, String) IPv4 address of the host.
+
+## Attribute Reference
+
+In addition to all arguments above, the following attributes are exported:
+
+* `id` - The resource ID.
+
+* `status` - The connection status. The options are as follows:
+ + **ACTIVE**: The datasource connection is activated.
+ + **DELETED**: The datasource connection is deleted.
+
+## Import
+
+The DLI datasource connection can be imported using the `id`, e.g.
+
+```shell
+terraform import flexibleengine_dli_datasource_connection.test 0ce123456a00f2591fabc00385ff1234
+```
+
+Note that the imported state may not be identical to your resource definition, due to some attributes missing from the
+API response, security or some other reason. The missing attributes include: `tags`.
+It is generally recommended running `terraform plan` after importing a resource.
+You can then decide if changes should be applied to the resource, or the resource definition should be updated to
+align with the resource. Also, you can ignore changes as below.
+
+```hcl
+resource "flexibleengine_dli_datasource_connection" "test" {
+ ...
+
+ lifecycle {
+ ignore_changes = [
+ tags,
+ ]
+ }
+}
diff --git a/docs/resources/dli_sql_job.md b/docs/resources/dli_sql_job.md
new file mode 100644
index 00000000..9e58a174
--- /dev/null
+++ b/docs/resources/dli_sql_job.md
@@ -0,0 +1,133 @@
+---
+subcategory: "Data Lake Insight (DLI)"
+---
+
+# flexibleengine_dli_sql_job
+
+Manages DLI SQL job resource within FlexibleEngine
+
+## Example Usage
+
+### Create a Sql job
+
+```hcl
+variable "database_name" {}
+variable "queue_name" {}
+variable "sql" {}
+
+resource "flexibleengine_dli_sql_job" "test" {
+ sql = var.sql
+ database_name = var.database_name
+ queue_name = var.queue_name
+}
+```
+
+## Argument Reference
+
+The following arguments are supported:
+
+* `region` - (Optional, String, ForceNew) Specifies the region in which to create the DLI table resource. If omitted,
+ the provider-level region will be used. Changing this parameter will create a new resource.
+
+* `sql` - (Required, String, ForceNew) Specifies SQL statement that you want to execute.
+ Changing this parameter will create a new resource.
+
+* `database_name` - (Optional, String, ForceNew) Specifies the database where the SQL is executed. This argument does
+ not need to be configured during database creation. Changing this parameter will create a new resource.
+
+* `queue_name` - (Optional, String, ForceNew) Specifies queue which this job to be submitted belongs.
+ Changing this parameter will create a new resource.
+
+* `tags` - (Optional, Map, ForceNew) Specifies label of a Job. Changing this parameter will create a new resource.
+
+* `conf` - (Optional, List, ForceNew) Specifies the configuration parameters for the SQL job. Changing this parameter
+ will create a new resource. The [conf](#dli_conf) object structure is documented below.
+
+
+The `conf` block supports:
+
+* `spark_sql_max_records_per_file` - (Optional, Int, ForceNew) Maximum number of records to be written
+ into a single file. If the value is zero or negative, there is no limit. Default value is `0`.
+ Changing this parameter will create a new resource.
+
+* `spark_sql_auto_broadcast_join_threshold` - (Optional, Int, ForceNew) Maximum size of the table that
+ displays all working nodes when a connection is executed. You can set this parameter to -1 to disable the display.
+ Default value is `209715200`. Changing this parameter will create a new resource.
+ Currently, only the configuration unit metastore table that runs the ANALYZE TABLE COMPUTE statistics no-scan
+ command and the file-based data source table that directly calculates statistics based on data files are supported.
+ Changing this parameter will create a new resource.
+
+* `spark_sql_shuffle_partitions` - (Optional, Int, ForceNew) Default number of partitions used to filter
+ data for join or aggregation. Default value is `4096`. Changing this parameter will create a new resource.
+
+* `spark_sql_dynamic_partition_overwrite_enabled` - (Optional, Bool, ForceNew) In dynamic mode, Spark does not delete
+ the previous partitions and only overwrites the partitions without data during execution. Default value is `false`.
+ Changing this parameter will create a new resource.
+
+* `spark_sql_files_max_partition_bytes` - (Optional, Int, ForceNew) Maximum number of bytes to be packed into a
+ single partition when a file is read. Default value is `134217728`.
+ Changing this parameter will create a new resource.
+
+* `spark_sql_bad_records_path` - (Optional, String, ForceNew) Path of bad records. Changing this parameter will create
+ a new resource.
+
+* `dli_sql_sqlasync_enabled` - (Optional, Bool, ForceNew) Specifies whether DDL and DCL statements are executed
+ asynchronously. The value true indicates that asynchronous execution is enabled. Default value is `false`.
+ Changing this parameter will create a new resource.
+
+* `dli_sql_job_timeout` - (Optional, Int, ForceNew) Sets the job running timeout interval. If the timeout interval
+ expires, the job is canceled. Unit: `ms`. Changing this parameter will create a new resource.
+
+## Attribute Reference
+
+In addition to all arguments above, the following attributes are exported:
+
+* `id` - Indicates a resource ID in UUID format.
+
+* `owner` - User who submits a job.
+
+* `job_type` - The type of job, includes **DDL**, **DCL**, **IMPORT**, **EXPORT**, **QUERY** and **INSERT**.
+
+* `status` - Status of a job, includes **RUNNING**, **SCALING**, **LAUNCHING**, **FINISHED**, **FAILED**,
+ and **CANCELED**.
+
+* `start_time` - Time when a job is started, in RFC-3339 format. e.g. `2019-10-12T07:20:50.52Z`
+
+* `duration` - Job running duration (unit: millisecond).
+
+* `schema` - When the statement type is DDL, the column name and type of DDL are displayed.
+
+* `rows` - When the statement type is DDL, results of the DDL are displayed.
+
+## Timeouts
+
+This resource provides the following timeouts configuration options:
+
+* `create` - Default is 20 minutes.
+* `delete` - Default is 45 minutes.
+
+## Import
+
+DLI SQL job can be imported by `id`, e.g.
+
+```shell
+terraform import flexibleengine_dli_sql_job.example 7f803d70-c533-469f-8431-e378f3e97123
+```
+
+Note that the imported state may not be identical to your resource definition, due to some attributes missing from the
+API response, security or some other reason. The missing attributes include: `conf`, `rows` and `schema`.
+It is generally recommended running `terraform plan` after importing a resource. You can then decide if changes should
+be applied to the resource, or the resource definition should be updated to align with the resource. Also, you can
+ignore changes as below.
+
+```hcl
+resource "flexibleengine_dli_sql_job" "test" {
+ ...
+
+ lifecycle {
+ ignore_changes = [
+ conf, rows, schema
+ ]
+ }
+}
+```
diff --git a/docs/resources/dli_template_flink.md b/docs/resources/dli_template_flink.md
new file mode 100644
index 00000000..934f2473
--- /dev/null
+++ b/docs/resources/dli_template_flink.md
@@ -0,0 +1,81 @@
+---
+subcategory: "Data Lake Insight (DLI)"
+---
+
+# flexibleengine_dli_template_flink
+
+Manages a DLI Flink template resource within FlexibleEngine.
+
+## Example Usage
+
+```hcl
+variable "sql" {}
+
+resource "flexibleengine_dli_template_flink" "test" {
+ name = "demo"
+ type = "flink_sql_job"
+ sql = var.sql
+ description = "This is a demo"
+
+ tags = {
+ foo = "bar"
+ key = "value"
+ }
+}
+```
+
+## Argument Reference
+
+The following arguments are supported:
+
+* `region` - (Optional, String, ForceNew) Specifies the region in which to create the resource.
+ If omitted, the provider-level region will be used. Changing this parameter will create a new resource.
+
+* `name` - (Required, String) The name of the flink template.
+
+* `sql` - (Optional, String) The statement of the flink template.
+
+* `description` - (Optional, String) The description of the flink template.
+
+* `type` - (Optional, String, ForceNew) The type of the flink template.
+ Valid values are **flink_sql_job** and **flink_opensource_sql_job**.
+ Defaults to **flink_sql_job**.
+
+ Changing this parameter will create a new resource.
+
+* `tags` - (Optional, Map, ForceNew) The key/value pairs to associate with the flink template.
+
+ Changing this parameter will create a new resource.
+
+## Attribute Reference
+
+In addition to all arguments above, the following attributes are exported:
+
+* `id` - The resource ID.
+
+## Import
+
+The flink template can be imported using the `id`, e.g.
+
+```shell
+terraform import flexibleengine_dli_template_flink.test 1231
+```
+
+Note that the imported state may not be identical to your resource definition, due to some attributes missing from the
+API response, security or some other reason. The missing attributes include:
+`tags`.
+It is generally recommended running `terraform plan` after importing a resource.
+You can then decide if changes should be applied to the resource, or the resource definition should be updated to align
+with the resource. Also, you can ignore changes as below.
+
+```bash
+resource "flexibleengine_dli_template_flink" "test" {
+ ...
+
+ lifecycle {
+ ignore_changes = [
+ tags
+ ]
+ }
+}
+```
diff --git a/flexibleengine/acceptance/resource_flexibleengine_dli_datasource_connection_test.go b/flexibleengine/acceptance/resource_flexibleengine_dli_datasource_connection_test.go
new file mode 100644
index 00000000..2b84bee2
--- /dev/null
+++ b/flexibleengine/acceptance/resource_flexibleengine_dli_datasource_connection_test.go
@@ -0,0 +1,161 @@
+package acceptance
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
+
+ "github.com/chnsz/golangsdk"
+
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/utils"
+)
+
+func getDatasourceConnectionResourceFunc(cfg *config.Config, state *terraform.ResourceState) (interface{}, error) {
+ region := OS_REGION_NAME
+ // getDatasourceConnection: Query the DLI instance
+ var (
+ getDatasourceConnectionHttpUrl = "v2.0/{project_id}/datasource/enhanced-connections/{id}"
+ getDatasourceConnectionProduct = "dli"
+ )
+ getDatasourceConnectionClient, err := cfg.NewServiceClient(getDatasourceConnectionProduct, region)
+ if err != nil {
+ return nil, fmt.Errorf("error creating DLI Client: %s", err)
+ }
+
+ getDatasourceConnectionPath := getDatasourceConnectionClient.Endpoint + getDatasourceConnectionHttpUrl
+ getDatasourceConnectionPath = strings.ReplaceAll(getDatasourceConnectionPath, "{project_id}",
+ getDatasourceConnectionClient.ProjectID)
+ getDatasourceConnectionPath = strings.ReplaceAll(getDatasourceConnectionPath, "{id}", state.Primary.ID)
+
+ getDatasourceConnectionOpt := golangsdk.RequestOpts{
+ KeepResponseBody: true,
+ OkCodes: []int{
+ 200,
+ },
+ }
+ getDatasourceConnectionResp, err := getDatasourceConnectionClient.Request("GET", getDatasourceConnectionPath,
+ &getDatasourceConnectionOpt)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving DatasourceConnection: %s", err)
+ }
+
+ getDatasourceConnectionRespBody, err := utils.FlattenResponse(getDatasourceConnectionResp)
+ if err != nil {
+ return nil, err
+ }
+
+ if utils.PathSearch("status", getDatasourceConnectionRespBody, "") == "DELETED" {
+ return nil, golangsdk.ErrDefault404{}
+ }
+
+ return getDatasourceConnectionRespBody, nil
+}
+
+func TestAccDatasourceConnection_basic(t *testing.T) {
+ var obj interface{}
+
+ name := acceptance.RandomAccResourceName()
+ rName := "flexibleengine_dli_datasource_connection.test"
+
+ rc := acceptance.InitResourceCheck(
+ rName,
+ &obj,
+ getDatasourceConnectionResourceFunc,
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: TestAccProviderFactories,
+ CheckDestroy: rc.CheckResourceDestroy(),
+ Steps: []resource.TestStep{
+ {
+ Config: testDatasourceConnection_basic(name),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(rName, "name", name),
+ resource.TestCheckResourceAttrPair(rName, "vpc_id", "flexibleengine_vpc_v1.test", "id"),
+ resource.TestCheckResourceAttrPair(rName, "subnet_id", "flexibleengine_vpc_subnet_v1.test", "id"),
+ resource.TestCheckResourceAttr(rName, "tags.foo", "bar"),
+ ),
+ },
+ {
+ Config: testDatasourceConnection_basic_update(name),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(rName, "name", name),
+ resource.TestCheckResourceAttr(rName, "queues.0", name),
+ resource.TestCheckResourceAttr(rName, "hosts.0.ip", "172.0.0.2"),
+ resource.TestCheckResourceAttr(rName, "hosts.0.name", "test.test.com"),
+ ),
+ },
+ {
+ ResourceName: rName,
+ ImportState: true,
+ ImportStateVerify: true,
+ ImportStateVerifyIgnore: []string{"tags"},
+ },
+ },
+ })
+}
+
+func testDatasourceConnectionbase(name string) string {
+ return fmt.Sprintf(`
+resource "flexibleengine_vpc_v1" "test" {
+ name = "%s"
+ cidr = "192.168.0.0/16"
+}
+
+resource "flexibleengine_vpc_subnet_v1" "test" {
+ name = "%s"
+ vpc_id = flexibleengine_vpc_v1.test.id
+ cidr = "192.168.0.0/24"
+ gateway_ip = "192.168.0.1"
+}
+`, name, name)
+}
+
+func testDatasourceConnection_basic(name string) string {
+ return fmt.Sprintf(`
+%s
+
+resource "flexibleengine_dli_datasource_connection" "test" {
+ name = "%s"
+ vpc_id = flexibleengine_vpc_v1.test.id
+ subnet_id = flexibleengine_vpc_subnet_v1.test.id
+
+ tags = {
+ foo = "bar"
+ }
+}
+`, testDatasourceConnectionbase(name), name)
+}
+
+func testDatasourceConnection_basic_update(name string) string {
+ return fmt.Sprintf(`
+%s
+
+resource "flexibleengine_dli_queue" "test" {
+ name = "%s"
+ cu_count = 16
+ resource_mode = 1
+}
+
+resource "flexibleengine_dli_datasource_connection" "test" {
+ name = "%s"
+ vpc_id = flexibleengine_vpc_v1.test.id
+ subnet_id = flexibleengine_vpc_subnet_v1.test.id
+
+ queues = [flexibleengine_dli_queue.test.name]
+
+ hosts {
+ ip = "172.0.0.2"
+ name = "test.test.com"
+ }
+}
+`, testDatasourceConnectionbase(name), name, name)
+}
diff --git a/flexibleengine/acceptance/resource_flexibleengine_dli_sql_job_test.go b/flexibleengine/acceptance/resource_flexibleengine_dli_sql_job_test.go
new file mode 100644
index 00000000..4ef8431e
--- /dev/null
+++ b/flexibleengine/acceptance/resource_flexibleengine_dli_sql_job_test.go
@@ -0,0 +1,233 @@
+package acceptance
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/chnsz/golangsdk/openstack/dli/v1/sqljob"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance"
+
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config"
+)
+
+func getDliSqlJobResourceFunc(config *config.Config, state *terraform.ResourceState) (interface{}, error) {
+ client, err := config.DliV1Client(OS_REGION_NAME)
+ if err != nil {
+ return nil, fmt.Errorf("error creating Dli v1 client, err=%s", err)
+ }
+ return sqljob.Status(client, state.Primary.ID)
+}
+
+// check the DDL sql
+func TestAccResourceDliSqlJob_basic(t *testing.T) {
+ var sqlJobObj sqljob.SqlJobOpts
+ resourceName := "flexibleengine_dli_sql_job.test"
+ name := acceptance.RandomAccResourceName()
+ obsBucketName := acceptance.RandomAccResourceNameWithDash()
+
+ rc := acceptance.InitResourceCheck(
+ resourceName,
+ &sqlJobObj,
+ getDliSqlJobResourceFunc,
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: TestAccProviderFactories,
+ CheckDestroy: testAccCheckDliSqlJobDestroy,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccSqlJobBaseResource_basic(name, obsBucketName),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(resourceName, "sql", fmt.Sprint("DESC ", name)),
+ resource.TestCheckResourceAttr(resourceName, "database_name", name),
+ resource.TestCheckResourceAttr(resourceName, "job_type", "DDL"),
+ ),
+ },
+ {
+ ResourceName: resourceName,
+ ImportState: true,
+ ImportStateVerify: true,
+ ImportStateVerifyIgnore: []string{"rows", "schema"},
+ },
+ },
+ })
+}
+
+func TestAccResourceDliSqlJob_query(t *testing.T) {
+ var sqlJobObj sqljob.SqlJobOpts
+ resourceName := "flexibleengine_dli_sql_job.test"
+ name := acceptance.RandomAccResourceName()
+ obsBucketName := acceptance.RandomAccResourceNameWithDash()
+
+ rc := acceptance.InitResourceCheck(
+ resourceName,
+ &sqlJobObj,
+ getDliSqlJobResourceFunc,
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: TestAccProviderFactories,
+ CheckDestroy: testAccCheckDliSqlJobDestroy,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccSqlJobBaseResource_query(name, obsBucketName),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(resourceName, "sql", fmt.Sprint("SELECT * FROM ", name)),
+ resource.TestCheckResourceAttr(resourceName, "database_name", name),
+ resource.TestCheckResourceAttr(resourceName, "queue_name", "default"),
+ resource.TestCheckResourceAttr(resourceName, "job_type", "QUERY"),
+ ),
+ },
+ {
+ ResourceName: resourceName,
+ ImportState: true,
+ ImportStateVerify: true,
+ ImportStateVerifyIgnore: []string{"rows", "schema"},
+ },
+ },
+ })
+}
+
+func TestAccResourceDliSqlJob_async(t *testing.T) {
+ var sqlJobObj sqljob.SqlJobOpts
+ resourceName := "flexibleengine_dli_sql_job.test"
+ name := acceptance.RandomAccResourceName()
+ obsBucketName := acceptance.RandomAccResourceNameWithDash()
+
+ rc := acceptance.InitResourceCheck(
+ resourceName,
+ &sqlJobObj,
+ getDliSqlJobResourceFunc,
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: TestAccProviderFactories,
+ CheckDestroy: testAccCheckDliSqlJobDestroy,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccSqlJobResource_aync(name, obsBucketName),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(resourceName, "sql", fmt.Sprint("SELECT * FROM ", name)),
+ resource.TestCheckResourceAttr(resourceName, "database_name", name),
+ resource.TestCheckResourceAttr(resourceName, "queue_name", "default"),
+ resource.TestCheckResourceAttr(resourceName, "job_type", "QUERY"),
+ ),
+ },
+ {
+ ResourceName: resourceName,
+ ImportState: true,
+ ImportStateVerify: true,
+ ImportStateVerifyIgnore: []string{"rows", "schema", "conf", "duration", "status"},
+ },
+ },
+ })
+}
+
+func testAccSqlJobBaseResource(name, obsBucketName string) string {
+ return fmt.Sprintf(`
+resource "flexibleengine_obs_bucket" "test" {
+ bucket = "%s"
+ acl = "private"
+}
+
+resource "flexibleengine_obs_bucket_object" "test" {
+ bucket = flexibleengine_obs_bucket.test.bucket
+ key = "user/data/user.csv"
+ content = "Jason,Tokyo"
+ content_type = "text/plain"
+}
+
+resource "flexibleengine_dli_database" "test" {
+ name = "%s"
+ description = "For terraform acc test"
+}
+
+resource "flexibleengine_dli_table" "test" {
+ database_name = flexibleengine_dli_database.test.name
+ name = "%s"
+ data_location = "OBS"
+ description = "dli table test"
+ data_format = "csv"
+ bucket_location = "obs://${flexibleengine_obs_bucket_object.test.bucket}/user/data"
+
+ columns {
+ name = "name"
+ type = "string"
+ description = "person name"
+ }
+
+ columns {
+ name = "addrss"
+ type = "string"
+ description = "home address"
+ }
+}
+`, obsBucketName, name, name)
+}
+
+func testAccSqlJobBaseResource_basic(name, obsBucketName string) string {
+ return fmt.Sprintf(`
+%s
+
+resource "flexibleengine_dli_sql_job" "test" {
+ sql = "DESC ${flexibleengine_dli_table.test.name}"
+ database_name = flexibleengine_dli_database.test.name
+}
+`, testAccSqlJobBaseResource(name, obsBucketName))
+}
+
+func testAccSqlJobBaseResource_query(name, obsBucketName string) string {
+ return fmt.Sprintf(`
+%s
+
+resource "flexibleengine_dli_sql_job" "test" {
+ sql = "SELECT * FROM ${flexibleengine_dli_table.test.name}"
+ database_name = flexibleengine_dli_database.test.name
+}
+`, testAccSqlJobBaseResource(name, obsBucketName))
+}
+
+func testAccSqlJobResource_aync(name, obsBucketName string) string {
+ return fmt.Sprintf(`
+%s
+
+resource "flexibleengine_dli_sql_job" "test" {
+ sql = "SELECT * FROM ${flexibleengine_dli_table.test.name}"
+ database_name = flexibleengine_dli_database.test.name
+
+ conf {
+ dli_sql_sqlasync_enabled = true
+ }
+}
+`, testAccSqlJobBaseResource(name, obsBucketName))
+}
+
+func testAccCheckDliSqlJobDestroy(s *terraform.State) error {
+ config := acceptance.TestAccProvider.Meta().(*config.Config)
+ client, err := config.DliV1Client(OS_REGION_NAME)
+ if err != nil {
+ return fmt.Errorf("error creating Dli client, err=%s", err)
+ }
+
+ for _, rs := range s.RootModule().Resources {
+ if rs.Type != "flexibleengine_dli_sql_job" {
+ continue
+ }
+
+ res, err := sqljob.Status(client, rs.Primary.ID)
+ if err == nil && res != nil && (res.Status != sqljob.JobStatusCancelled &&
+ res.Status != sqljob.JobStatusFinished && res.Status != sqljob.JobStatusFailed) {
+ return fmt.Errorf("flexibleengine_dli_sql_job still exists:%s,%+v,%+v", rs.Primary.ID, err, res)
+ }
+ }
+
+ return nil
+}
diff --git a/flexibleengine/acceptance/resource_flexibleengine_dli_template_flink_test.go b/flexibleengine/acceptance/resource_flexibleengine_dli_template_flink_test.go
new file mode 100644
index 00000000..ee91f670
--- /dev/null
+++ b/flexibleengine/acceptance/resource_flexibleengine_dli_template_flink_test.go
@@ -0,0 +1,142 @@
+package acceptance
+
+import (
+ "fmt"
+ "strings"
+ "testing"
+
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
+
+ "github.com/chnsz/golangsdk"
+
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/config"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/services/acceptance"
+ "github.com/huaweicloud/terraform-provider-huaweicloud/huaweicloud/utils"
+)
+
+func getFlinkTemplateResourceFunc(cfg *config.Config, state *terraform.ResourceState) (interface{}, error) {
+ region := OS_REGION_NAME
+ // getFlinkTemplate: Query the Flink template.
+ var (
+ getFlinkTemplateHttpUrl = "v1.0/{project_id}/streaming/job-templates"
+ getFlinkTemplateProduct = "dli"
+ )
+ getFlinkTemplateClient, err := cfg.NewServiceClient(getFlinkTemplateProduct, region)
+ if err != nil {
+ return nil, fmt.Errorf("error creating DLI Client: %s", err)
+ }
+
+ getFlinkTemplatePath := getFlinkTemplateClient.Endpoint + getFlinkTemplateHttpUrl
+ getFlinkTemplatePath = strings.ReplaceAll(getFlinkTemplatePath, "{project_id}", getFlinkTemplateClient.ProjectID)
+
+ if v, ok := state.Primary.Attributes["name"]; ok {
+ getFlinkTemplatePath += fmt.Sprintf("?limit=100&name=%s", v)
+ }
+
+ getFlinkTemplateOpt := golangsdk.RequestOpts{
+ KeepResponseBody: true,
+ OkCodes: []int{
+ 200,
+ },
+ }
+ getFlinkTemplateResp, err := getFlinkTemplateClient.Request("GET", getFlinkTemplatePath, &getFlinkTemplateOpt)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving FlinkTemplate: %s", err)
+ }
+
+ getFlinkTemplateRespBody, err := utils.FlattenResponse(getFlinkTemplateResp)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving Flink template: %s", err)
+ }
+
+ jsonPath := fmt.Sprintf("template_list.templates[?template_id==`%s`]|[0]", state.Primary.ID)
+ template := utils.PathSearch(jsonPath, getFlinkTemplateRespBody, nil)
+ if template == nil {
+ return nil, fmt.Errorf("no data found")
+ }
+ return template, nil
+}
+
+func TestAccFlinkTemplate_basic(t *testing.T) {
+ var obj interface{}
+
+ name := acceptance.RandomAccResourceName()
+ rName := "flexibleengine_dli_template_flink.test"
+
+ rc := acceptance.InitResourceCheck(
+ rName,
+ &obj,
+ getFlinkTemplateResourceFunc,
+ )
+
+ resource.ParallelTest(t, resource.TestCase{
+ PreCheck: func() { testAccPreCheck(t) },
+ ProviderFactories: TestAccProviderFactories,
+ CheckDestroy: rc.CheckResourceDestroy(),
+ Steps: []resource.TestStep{
+ {
+ Config: testFlinkTemplate_basic(name),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(rName, "name", name),
+ resource.TestCheckResourceAttr(rName, "sql", "select * from source_table"),
+ resource.TestCheckResourceAttr(rName, "description", "This is a demo"),
+ resource.TestCheckResourceAttr(rName, "type", "flink_sql_job"),
+ resource.TestCheckResourceAttr(rName, "tags.foo", "bar"),
+ resource.TestCheckResourceAttr(rName, "tags.key", "value"),
+ ),
+ },
+ {
+ Config: testFlinkTemplate_basic_update(name),
+ Check: resource.ComposeTestCheckFunc(
+ rc.CheckResourceExists(),
+ resource.TestCheckResourceAttr(rName, "name", name),
+ resource.TestCheckResourceAttr(rName, "sql", "select * from source_table2"),
+ resource.TestCheckResourceAttr(rName, "description", "This is a demo2"),
+ resource.TestCheckResourceAttr(rName, "type", "flink_sql_job"),
+ resource.TestCheckResourceAttr(rName, "tags.foo", "bar"),
+ resource.TestCheckResourceAttr(rName, "tags.key", "value2"),
+ ),
+ },
+ {
+ ResourceName: rName,
+ ImportState: true,
+ ImportStateVerify: true,
+ ImportStateVerifyIgnore: []string{"tags"},
+ },
+ },
+ })
+}
+
+func testFlinkTemplate_basic(name string) string {
+ return fmt.Sprintf(`
+resource "flexibleengine_dli_template_flink" "test" {
+ name = "%s"
+ type = "flink_sql_job"
+ sql = "select * from source_table"
+ description = "This is a demo"
+
+ tags = {
+ foo = "bar"
+ key = "value"
+ }
+}
+`, name)
+}
+
+func testFlinkTemplate_basic_update(name string) string {
+ return fmt.Sprintf(`
+resource "flexibleengine_dli_template_flink" "test" {
+ name = "%s"
+ type = "flink_sql_job"
+ sql = "select * from source_table2"
+ description = "This is a demo2"
+
+ tags = {
+ foo = "bar"
+ key = "value2"
+ }
+}
+`, name)
+}
diff --git a/flexibleengine/provider.go b/flexibleengine/provider.go
index 92d9261d..37f75ca2 100644
--- a/flexibleengine/provider.go
+++ b/flexibleengine/provider.go
@@ -499,16 +499,20 @@ func Provider() *schema.Provider {
"flexibleengine_dms_rocketmq_topic": dms.ResourceDmsRocketMQTopic(),
"flexibleengine_dms_rocketmq_user": dms.ResourceDmsRocketMQUser(),
- "flexibleengine_dli_database": dli.ResourceDliSqlDatabaseV1(),
- "flexibleengine_dli_global_variable": dli.ResourceGlobalVariable(),
- "flexibleengine_dli_package": dli.ResourceDliPackageV2(),
- "flexibleengine_dli_spark_job": dli.ResourceDliSparkJobV2(),
- "flexibleengine_dli_table": dli.ResourceDliTable(),
- "flexibleengine_dli_flinksql_job": dli.ResourceFlinkSqlJob(),
- "flexibleengine_drs_job": drs.ResourceDrsJob(),
- "flexibleengine_fgs_dependency": fgs.ResourceFgsDependency(),
- "flexibleengine_fgs_function": fgs.ResourceFgsFunctionV2(),
- "flexibleengine_fgs_trigger": fgs.ResourceFunctionGraphTrigger(),
+ "flexibleengine_dli_database": dli.ResourceDliSqlDatabaseV1(),
+ "flexibleengine_dli_datasource_connection": dli.ResourceDatasourceConnection(),
+ "flexibleengine_dli_flinksql_job": dli.ResourceFlinkSqlJob(),
+ "flexibleengine_dli_global_variable": dli.ResourceGlobalVariable(),
+ "flexibleengine_dli_package": dli.ResourceDliPackageV2(),
+ "flexibleengine_dli_spark_job": dli.ResourceDliSparkJobV2(),
+ "flexibleengine_dli_sql_job": dli.ResourceSqlJob(),
+ "flexibleengine_dli_table": dli.ResourceDliTable(),
+ "flexibleengine_dli_template_flink": dli.ResourceFlinkTemplate(),
+
+ "flexibleengine_drs_job": drs.ResourceDrsJob(),
+ "flexibleengine_fgs_dependency": fgs.ResourceFgsDependency(),
+ "flexibleengine_fgs_function": fgs.ResourceFgsFunctionV2(),
+ "flexibleengine_fgs_trigger": fgs.ResourceFunctionGraphTrigger(),
"flexibleengine_gaussdb_cassandra_instance": gaussdb.ResourceGeminiDBInstanceV3(),
"flexibleengine_gaussdb_influx_instance": gaussdb.ResourceGaussDBInfluxInstanceV3(),