diff --git a/docs/resources/dms_consumer_group_v2.md b/docs/resources/dms_consumer_group_v2.md new file mode 100644 index 000000000..31acefc3c --- /dev/null +++ b/docs/resources/dms_consumer_group_v2.md @@ -0,0 +1,99 @@ +--- +subcategory: "Distributed Message Service (DMS)" +layout: "opentelekomcloud" +page_title: "OpenTelekomCloud: opentelekomcloud_dms_consumer_group_v2" +sidebar_current: "docs-opentelekomcloud-resource-dms-consumer-group-v2" +description: |- + Manages an up-to-date DMS Consumer Group v2 resource within OpenTelekomCloud. +--- + +Up-to-date reference of API arguments for DMS instance management you can get at +[documentation portal](https://docs.otc.t-systems.com/distributed-message-service/api-ref/apis_v2_recommended/instance_management/index.html) + +# opentelekomcloud_dms_consumer_group_v2 + +Manage DMS consumer group v2 resource within OpenTelekomCloud. + +## Example Usage + +```hcl +variable "instance_id" {} + +resource "opentelekomcloud_dms_consumer_group_v2" "group_1" { + instance_id = var.instance_id + group_name = "dms_consumer_group" + description = "Sample consumer group" +} +``` + +## Argument Reference + +The following arguments are supported: + +* `instance_id` - (Required, String, ForceNew) Specifies the ID of the DMS instance. + + Changing this parameter will create a new resource. + +* `group_name` - (Required, String, ForceNew) Specifies the name of the DMS consumer group. + + Changing this parameter will create a new resource. + +* `description` - (Optional, String, ForceNew) Specifies any description for the DMS consumer group. + + Changing this parameter will create a new resource. + +## Attribute Reference + +In addition to all arguments above, the following attribute is exported: + +* `state` - Indicates the Consumer group status. The value can be: + * Dead: The consumer group has no members and no metadata. + * Empty: The consumer group has metadata but has no members. + * PreparingRebalance: The consumer group is to be rebalanced. + * CompletingRebalance: All members have jointed the group. + * Stable: Members in the consumer group can consume messages normally. + +* `assignment_strategy` - Indicates the partition assignment policy. +* `coordinator_id` - Indicates the coordinator ID. +* `members` - Indicates the consumer list. The structure is documented below. +* `group_message_offsets` - Indicates the consumer offset. The structure is documented below. + +The `members` block contains: + +* `host` - Indicates the consumer address. +* `member_id` - Indicates the consumer ID. +* `client_id` - Indicates the client ID. +* `assignments` - Indicates the details about the partition assigned to the consumer. The structure is as follows: + + `topic` - Indicates the topic name. + + `partitions` - Indicates the partition list. + +The `group_message_offsets` block contains: + +* `partition` - Indicates the partition number. +* `lag` - Indicates the number of remaining messages that can be retrieved, that is, the number of accumulated messages. +* `topic` - Indicates the topic name. +* `message_current_offset` - Indicates the consumer offset. +* `message_log_end_offset` - Indicates the log end offset (LEO). + +## Import + +DMS consumer groups can be imported using their `group_name` and related `instance_id`, separated by a slash, e.g. + +```bash +$ terraform import opentelekomcloud_dms_consumer_group_v2.test_group / +``` +## Notes + +But due to some attributes missing from the API response, it's required to ignore changes as below: + +```hcl +resource "opentelekomcloud_dms_consumer_group_v2" "group_1" { + # ... + + lifecycle { + ignore_changes = [ + description, + ] + } +} +``` diff --git a/opentelekomcloud/acceptance/dms/resource_opentelekomcloud_dms_consumer_group_v2_test.go b/opentelekomcloud/acceptance/dms/resource_opentelekomcloud_dms_consumer_group_v2_test.go new file mode 100644 index 000000000..f32ce9db4 --- /dev/null +++ b/opentelekomcloud/acceptance/dms/resource_opentelekomcloud_dms_consumer_group_v2_test.go @@ -0,0 +1,103 @@ +package acceptance + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2/instances/management" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/acceptance/common" + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/acceptance/env" + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/common/cfg" +) + +const resourceGroupV2Name = "opentelekomcloud_dms_consumer_group_v2.group_1" + +func getDmsConsumerGroupResourceFunc(cfg *cfg.Config, state *terraform.ResourceState) (interface{}, error) { + client, err := cfg.DmsV2Client(env.OS_REGION_NAME) + if err != nil { + return nil, fmt.Errorf("error creating DMS V2 Client: %s", err) + } + getResp, err := management.GetConsumerGroup(client, state.Primary.Attributes["instance_id"], state.Primary.Attributes["group_name"]) + if err != nil { + return nil, fmt.Errorf("error fetching dms group: %s", err) + } + return getResp.Group, nil +} + +func TestAccDmsConsumerGroupV2_basic(t *testing.T) { + var dmsConsumerGroup management.Group + rc := common.InitResourceCheck( + resourceGroupV2Name, + &dmsConsumerGroup, + getDmsConsumerGroupResourceFunc, + ) + + var groupName = fmt.Sprintf("dms_consumer_group_%s", acctest.RandString(5)) + + resource.Test(t, resource.TestCase{ + PreCheck: func() { common.TestAccPreCheck(t) }, + ProviderFactories: common.TestAccProviderFactories, + CheckDestroy: rc.CheckResourceDestroy(), + Steps: []resource.TestStep{ + { + Config: testAccDmsV2ConsumerGroupBasic(groupName), + Check: resource.ComposeTestCheckFunc( + rc.CheckResourceExists(), + resource.TestCheckResourceAttr(resourceGroupV2Name, "group_name", groupName), + ), + }, + { + ResourceName: resourceGroupV2Name, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{ + "description", + }, + }, + }, + }) +} + +func testAccDmsV2ConsumerGroupBasic(groupName string) string { + return fmt.Sprintf(` +%s + +%s + +data "opentelekomcloud_dms_az_v1" "az_1" {} + +data "opentelekomcloud_dms_product_v1" "product_1" { + engine = "kafka" + instance_type = "cluster" + version = "2.3.0" +} + +resource "opentelekomcloud_dms_instance_v2" "instance_1" { + name = "dms_test_cg_instance" + engine = "kafka" + storage_space = data.opentelekomcloud_dms_product_v1.product_1.storage + access_user = "user" + password = "Dmstest@123" + vpc_id = data.opentelekomcloud_vpc_subnet_v1.shared_subnet.vpc_id + security_group_id = data.opentelekomcloud_networking_secgroup_v2.default_secgroup.id + subnet_id = data.opentelekomcloud_vpc_subnet_v1.shared_subnet.network_id + available_zones = [data.opentelekomcloud_dms_az_v1.az_1.id] + product_id = data.opentelekomcloud_dms_product_v1.product_1.id + engine_version = data.opentelekomcloud_dms_product_v1.product_1.version + storage_spec_code = data.opentelekomcloud_dms_product_v1.product_1.storage_spec_code +} + +resource "opentelekomcloud_dms_consumer_group_v2" "group_1" { + instance_id = opentelekomcloud_dms_instance_v2.instance_1.id + group_name = "%s" + description = "Test consumer group" +} + + +`, common.DataSourceSecGroupDefault, common.DataSourceSubnet, groupName) +} diff --git a/opentelekomcloud/provider.go b/opentelekomcloud/provider.go index ab92d28b5..162ed8643 100644 --- a/opentelekomcloud/provider.go +++ b/opentelekomcloud/provider.go @@ -441,6 +441,7 @@ func Provider() *schema.Provider { "opentelekomcloud_dns_ptrrecord_v2": dns.ResourceDNSPtrRecordV2(), "opentelekomcloud_dns_recordset_v2": dns.ResourceDNSRecordSetV2(), "opentelekomcloud_dns_zone_v2": dns.ResourceDNSZoneV2(), + "opentelekomcloud_dms_consumer_group_v2": dms.ResourceDmsConsumerGroupV2(), "opentelekomcloud_dms_instance_v1": dms.ResourceDmsInstancesV1(), "opentelekomcloud_dms_instance_v2": dms.ResourceDmsInstancesV2(), "opentelekomcloud_dms_dedicated_instance_v2": dms.ResourceDmsDedicatedInstanceV2(), diff --git a/opentelekomcloud/services/dms/resource_opentelekomcloud_dms_consumer_group_v2.go b/opentelekomcloud/services/dms/resource_opentelekomcloud_dms_consumer_group_v2.go new file mode 100644 index 000000000..72e9cb8da --- /dev/null +++ b/opentelekomcloud/services/dms/resource_opentelekomcloud_dms_consumer_group_v2.go @@ -0,0 +1,253 @@ +package dms + +import ( + "context" + "fmt" + "log" + + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + golangsdk "github.com/opentelekomcloud/gophertelekomcloud" + "github.com/opentelekomcloud/gophertelekomcloud/openstack/dms/v2/instances/management" + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/common" + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/common/cfg" + "github.com/opentelekomcloud/terraform-provider-opentelekomcloud/opentelekomcloud/common/fmterr" +) + +func ResourceDmsConsumerGroupV2() *schema.Resource { + return &schema.Resource{ + CreateContext: resourceDmsConsumerGroupV2Create, + ReadContext: resourceDmsConsumerGroupV2Read, + DeleteContext: resourceDmsConsumerGroupV2Delete, + + Importer: &schema.ResourceImporter{ + StateContext: common.ImportByPath("instance_id", "group_name"), + }, + + Schema: map[string]*schema.Schema{ + "instance_id": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "group_name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "description": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.StringLenBetween(0, 200), + }, + "state": { + Type: schema.TypeString, + Computed: true, + }, + "assignment_strategy": { + Type: schema.TypeString, + Computed: true, + }, + "coordinator_id": { + Type: schema.TypeInt, + Computed: true, + }, + "members": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "host": { + Type: schema.TypeString, + Computed: true, + }, + "member_id": { + Type: schema.TypeString, + Computed: true, + }, + "client_id": { + Type: schema.TypeString, + Computed: true, + }, + "assignments": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "topic": { + Type: schema.TypeString, + Computed: true, + }, + "partitions": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Schema{ + Type: schema.TypeInt, + }, + }, + }, + }, + }, + }, + }, + }, + "group_message_offsets": { + Type: schema.TypeList, + Computed: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "partition": { + Type: schema.TypeInt, + Computed: true, + }, + "lag": { + Type: schema.TypeInt, + Computed: true, + }, + "topic": { + Type: schema.TypeString, + Computed: true, + }, + "message_current_offset": { + Type: schema.TypeInt, + Computed: true, + }, + "message_log_end_offset": { + Type: schema.TypeInt, + Computed: true, + }, + }, + }, + }, + "region": { + Type: schema.TypeString, + Computed: true, + }, + }, + } +} + +func resourceDmsConsumerGroupV2Create(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + config := meta.(*cfg.Config) + client, err := common.ClientFromCtx(ctx, dmsClientV2, func() (*golangsdk.ServiceClient, error) { + return config.DmsV2Client(config.GetRegion(d)) + }) + if err != nil { + return fmterr.Errorf(errCreationClientV2, err) + } + + instanceId := d.Get("instance_id").(string) + + createConsumerGroupOpts := management.CreateConsumerGroupOpts{ + GroupName: d.Get("group_name").(string), + Description: d.Get("description").(string), + } + + err = management.CreateConsumerGroup(client, instanceId, createConsumerGroupOpts) + if err != nil { + return diag.Errorf("error creating consumer group for Kafka instance: %s", err) + } + + group_id := fmt.Sprintf("%s/%s", instanceId, createConsumerGroupOpts.GroupName) + d.SetId(group_id) + + clientCtx := common.CtxWithClient(ctx, client, dmsClientV2) + return resourceDmsConsumerGroupV2Read(clientCtx, d, meta) +} + +func resourceDmsConsumerGroupV2Read(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + config := meta.(*cfg.Config) + client, err := common.ClientFromCtx(ctx, dmsClientV2, func() (*golangsdk.ServiceClient, error) { + return config.DmsV2Client(config.GetRegion(d)) + }) + if err != nil { + return fmterr.Errorf(errCreationClientV2, err) + } + + instanceId := d.Get("instance_id").(string) + groupName := d.Get("group_name").(string) + + getResp, err := management.GetConsumerGroup(client, instanceId, groupName) + if err != nil { + return common.CheckDeletedDiag(d, err, "DMS Kafka instance or consumer group not found") + } + + consumerGroup := getResp.Group + + mErr := multierror.Append( + nil, + d.Set("region", config.GetRegion(d)), + d.Set("instance_id", instanceId), + d.Set("group_name", consumerGroup.GroupId), + d.Set("state", consumerGroup.State), + d.Set("coordinator_id", consumerGroup.CoordinatorId), + d.Set("assignment_strategy", consumerGroup.AssignmentStrategy), + ) + + var memberList []map[string]interface{} + for _, memberRaw := range consumerGroup.Members { + member := make(map[string]interface{}) + member["host"] = memberRaw.Host + member["member_id"] = memberRaw.MemberId + member["client_id"] = memberRaw.ClientId + var assignmentList []map[string]interface{} + for _, assignmentRaw := range memberRaw.Assignment { + assignment := make(map[string]interface{}) + assignment["topic"] = assignmentRaw.Topic + assignment["partitions"] = assignmentRaw.Partitions + assignmentList = append(assignmentList, assignment) + } + member["assignments"] = assignmentList + memberList = append(memberList, member) + } + + mErr = multierror.Append( + mErr, + d.Set("members", memberList), + ) + + var groupMessageOffsets []map[string]interface{} + for _, groupMessageOffsetRaw := range consumerGroup.GroupMessageOffsets { + groupMessageOffset := map[string]interface{}{ + "partition": groupMessageOffsetRaw.Partition, + "lag": groupMessageOffsetRaw.Lag, + "topic": groupMessageOffsetRaw.Topic, + "message_current_offset": groupMessageOffsetRaw.MessageCurrentOffset, + "message_log_end_offset": groupMessageOffsetRaw.MessageLogEndOffset, + } + + groupMessageOffsets = append(groupMessageOffsets, groupMessageOffset) + } + + mErr = multierror.Append( + mErr, + d.Set("group_message_offsets", groupMessageOffsets), + ) + + return diag.FromErr(mErr.ErrorOrNil()) +} + +func resourceDmsConsumerGroupV2Delete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + config := meta.(*cfg.Config) + client, err := common.ClientFromCtx(ctx, dmsClientV2, func() (*golangsdk.ServiceClient, error) { + return config.DmsV2Client(config.GetRegion(d)) + }) + if err != nil { + return fmterr.Errorf(errCreationClientV2, err) + } + + instanceId := d.Get("instance_id").(string) + groupName := d.Get("group_name").(string) + + err = management.DeleteConsumerGroup(client, instanceId, groupName) + if err != nil { + return diag.Errorf("error deleting DMSv2 consumer group: %v", err) + } + + d.SetId("") + log.Printf("[DEBUG] DMS Kafka instance consumer '%s' group has been deleted", groupName) + return nil +} diff --git a/releasenotes/notes/ddm-add-resource-ddm-consumer-group-v2-664fb7201aec364b.yaml b/releasenotes/notes/ddm-add-resource-ddm-consumer-group-v2-664fb7201aec364b.yaml new file mode 100644 index 000000000..72acbdf44 --- /dev/null +++ b/releasenotes/notes/ddm-add-resource-ddm-consumer-group-v2-664fb7201aec364b.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + **[DMS]** Add new resource ``resource/opentelekomcloud_dms_consumer_group_v2`` (`#2733 `_).