From 96b0f34dc9b9c3fa4d1b742b0102f5ade61a6c2c Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Mon, 16 Oct 2023 07:31:20 -0700 Subject: [PATCH] Fix Kafka Partition Count (#641) - addresses `partitions` being returned by API Get/List operations instead of `partition_count` which is used for Create/Update operations Co-authored-by: Andrew Starr-Bochicchio --- databases.go | 29 ++++++-- databases_test.go | 179 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 176 insertions(+), 32 deletions(-) diff --git a/databases.go b/databases.go index fe0bac78..01e332e8 100644 --- a/databases.go +++ b/databases.go @@ -297,11 +297,26 @@ type DatabaseDB struct { // DatabaseTopic represents a Kafka topic type DatabaseTopic struct { - Name string `json:"name"` - PartitionCount *uint32 `json:"partition_count,omitempty"` - ReplicationFactor *uint32 `json:"replication_factor,omitempty"` - State string `json:"state,omitempty"` - Config *TopicConfig `json:"config,omitempty"` + Name string `json:"name"` + Partitions []*TopicPartition `json:"partitions,omitempty"` + ReplicationFactor *uint32 `json:"replication_factor,omitempty"` + State string `json:"state,omitempty"` + Config *TopicConfig `json:"config,omitempty"` +} + +// TopicPartition represents the state of a Kafka topic partition +type TopicPartition struct { + EarliestOffset uint64 `json:"earliest_offset,omitempty"` + InSyncReplicas uint32 `json:"in_sync_replicas,omitempty"` + Id uint32 `json:"id,omitempty"` + Size uint64 `json:"size,omitempty"` + ConsumerGroups []*TopicConsumerGroup `json:"consumer_groups,omitempty"` +} + +// TopicConsumerGroup represents a consumer group for a particular Kafka topic +type TopicConsumerGroup struct { + Name string `json:"name,omitempty"` + Offset uint64 `json:"offset,omitempty"` } // TopicConfig represents all configurable options for a Kafka topic @@ -342,7 +357,9 @@ type DatabaseCreateTopicRequest struct { // DatabaseUpdateTopicRequest ... type DatabaseUpdateTopicRequest struct { - Topic *DatabaseTopic `json:"topic"` // note: `name` field in Topic unused on update + PartitionCount *uint32 `json:"partition_count,omitempty"` + ReplicationFactor *uint32 `json:"replication_factor,omitempty"` + Config *TopicConfig `json:"config,omitempty"` } // DatabaseReplica represents a read-only replica of a particular database diff --git a/databases_test.go b/databases_test.go index 43a2e0ff..c8df184d 100644 --- a/databases_test.go +++ b/databases_test.go @@ -2323,7 +2323,6 @@ func TestDatabases_CreateTopic(t *testing.T) { want := &DatabaseTopic{ Name: "events", - PartitionCount: &numPartitions, ReplicationFactor: &replicationFactor, Config: &TopicConfig{ RetentionMS: &retentionMS, @@ -2392,12 +2391,10 @@ func TestDatabases_UpdateTopic(t *testing.T) { }) _, err := client.Databases.UpdateTopic(ctx, dbID, topicName, &DatabaseUpdateTopicRequest{ - Topic: &DatabaseTopic{ - PartitionCount: &numPartitions, - ReplicationFactor: &replicationFactor, - Config: &TopicConfig{ - RetentionMS: &retentionMS, - }, + PartitionCount: &numPartitions, + ReplicationFactor: &replicationFactor, + Config: &TopicConfig{ + RetentionMS: &retentionMS, }, }) @@ -2430,14 +2427,35 @@ func TestDatabases_GetTopic(t *testing.T) { var ( dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" topicName = "events" - numPartitions = uint32(3) replicationFactor = uint32(2) retentionMS = int64(1000 * 60) ) want := &DatabaseTopic{ - Name: "events", - PartitionCount: &numPartitions, + Name: "events", + Partitions: []*TopicPartition{ + { + Size: 0, + Id: 0, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 1, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 2, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + }, ReplicationFactor: &replicationFactor, Config: &TopicConfig{ RetentionMS: &retentionMS, @@ -2445,15 +2463,37 @@ func TestDatabases_GetTopic(t *testing.T) { } body := `{ - "topic": { - "name": "events", - "partition_count": 3, - "replication_factor": 2, - "config": { - "retention_ms": 60000 - } - } - }` + "topic":{ + "name":"events", + "replication_factor":2, + "config":{ + "retention_ms":60000 + }, + "partitions":[ + { + "size":0, + "id":0, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":1, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":2, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + } + ] + } + }` path := fmt.Sprintf("/v2/databases/%s/topics/%s", dbID, topicName) @@ -2473,23 +2513,66 @@ func TestDatabases_ListTopics(t *testing.T) { var ( dbID = "deadbeef-dead-4aa5-beef-deadbeef347d" - numPartitions = uint32(3) replicationFactor = uint32(2) retentionMS = int64(1000 * 60) ) want := []DatabaseTopic{ { - Name: "events", - PartitionCount: &numPartitions, + Name: "events", + Partitions: []*TopicPartition{ + { + Size: 0, + Id: 0, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 1, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 2, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + }, ReplicationFactor: &replicationFactor, Config: &TopicConfig{ RetentionMS: &retentionMS, }, }, { - Name: "events_ii", - PartitionCount: &numPartitions, + Name: "events_ii", + Partitions: []*TopicPartition{ + { + Size: 0, + Id: 0, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 1, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + { + Size: 0, + Id: 2, + InSyncReplicas: 2, + EarliestOffset: 0, + ConsumerGroups: nil, + }, + }, ReplicationFactor: &replicationFactor, Config: &TopicConfig{ RetentionMS: &retentionMS, @@ -2501,7 +2584,29 @@ func TestDatabases_ListTopics(t *testing.T) { "topics": [ { "name": "events", - "partition_count": 3, + "partitions":[ + { + "size":0, + "id":0, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":1, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":2, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + } + ], "replication_factor": 2, "config": { "retention_ms": 60000 @@ -2509,7 +2614,29 @@ func TestDatabases_ListTopics(t *testing.T) { }, { "name": "events_ii", - "partition_count": 3, + "partitions":[ + { + "size":0, + "id":0, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":1, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + }, + { + "size":0, + "id":2, + "in_sync_replicas":2, + "earliest_offset":0, + "consumer_groups":null + } + ], "replication_factor": 2, "config": { "retention_ms": 60000